refactor(fileio): use a linear buffer for FileDescriptor

Using a ring buffer for buffered synchronous fileio is just unnecessary
complexity.

- when reading, we always consume the _entire_ buffer before getting
  into syscalls. Thus we reset the buffer to its initial position before
  when we actually read.
- when writing and buffer is full, we always flush the entire buffer
  before starting to buffer again. So we can reset the buffer to its
  initial state.

Also no static buffers are needed for writing and skipping. Needing an
extra copy for each write completely defeated the purpose of
a ring buffer (if there had been one)
This commit is contained in:
bfredl 2024-05-28 12:52:24 +02:00
parent f55767afe2
commit 064483a2b4
4 changed files with 144 additions and 159 deletions

View File

@ -113,7 +113,6 @@ void mpack_handle(ObjectType type, handle_T handle, PackerBuffer *packer)
mpack_w(&packer->ptr, 0xc7);
mpack_w(&packer->ptr, packsize);
mpack_w(&packer->ptr, exttype);
// check_buffer(packer);
memcpy(packer->ptr, buf, (size_t)packsize);
packer->ptr += packsize;
}

View File

@ -120,12 +120,9 @@ int file_open_fd(FileDescriptor *const ret_fp, const int fd, const int flags)
assert(!ret_fp->wr || !ret_fp->non_blocking);
ret_fp->fd = fd;
ret_fp->eof = false;
ret_fp->rv = rbuffer_new(kRWBufferSize);
ret_fp->_error = 0;
if (ret_fp->wr) {
ret_fp->rv->data = ret_fp;
ret_fp->rv->full_cb = (rbuffer_callback)&file_rb_write_full_cb;
}
ret_fp->buffer = alloc_block();
ret_fp->read_pos = ret_fp->buffer;
ret_fp->write_pos = ret_fp->buffer;
ret_fp->bytes_read = 0;
return 0;
}
@ -148,8 +145,9 @@ void file_open_buffer(FileDescriptor *ret_fp, char *data, size_t len)
ret_fp->non_blocking = false;
ret_fp->fd = -1;
ret_fp->eof = true;
ret_fp->rv = rbuffer_new_wrap_buf(data, len);
ret_fp->_error = 0;
ret_fp->buffer = NULL; // we don't take ownership
ret_fp->read_pos = data;
ret_fp->write_pos = data + len;
ret_fp->bytes_read = 0;
}
@ -163,36 +161,18 @@ int file_close(FileDescriptor *const fp, const bool do_fsync)
FUNC_ATTR_NONNULL_ALL
{
if (fp->fd < 0) {
rbuffer_free(fp->rv);
return 0;
}
const int flush_error = (do_fsync ? file_fsync(fp) : file_flush(fp));
const int close_error = os_close(fp->fd);
rbuffer_free(fp->rv);
free_block(fp->buffer);
if (close_error != 0) {
return close_error;
}
return flush_error;
}
/// Flush file modifications to disk
///
/// @param[in,out] fp File to work with.
///
/// @return 0 or error code.
int file_flush(FileDescriptor *const fp)
FUNC_ATTR_NONNULL_ALL
{
if (!fp->wr) {
return 0;
}
file_rb_write_full_cb(fp->rv, fp);
const int error = fp->_error;
fp->_error = 0;
return error;
}
/// Flush file modifications to disk and run fsync()
///
/// @param[in,out] fp File to work with.
@ -218,36 +198,29 @@ int file_fsync(FileDescriptor *const fp)
return 0;
}
/// Buffer used for writing
/// Flush file modifications to disk
///
/// Like IObuff, but allows file_\* callers not to care about spoiling it.
static char writebuf[kRWBufferSize];
/// Function run when RBuffer is full when writing to a file
///
/// Actually does writing to the file, may also be invoked directly.
///
/// @param[in,out] rv RBuffer instance used.
/// @param[in,out] fp File to work with.
static void file_rb_write_full_cb(RBuffer *const rv, void *const fp_in)
///
/// @return 0 or error code.
int file_flush(FileDescriptor *fp)
FUNC_ATTR_NONNULL_ALL
{
FileDescriptor *const fp = fp_in;
assert(fp->wr);
assert(rv->data == (void *)fp);
if (rbuffer_size(rv) == 0) {
return;
if (!fp->wr) {
return 0;
}
const size_t read_bytes = rbuffer_read(rv, writebuf, kRWBufferSize);
const ptrdiff_t wres = os_write(fp->fd, writebuf, read_bytes,
ptrdiff_t to_write = fp->write_pos - fp->read_pos;
if (to_write == 0) {
return 0;
}
const ptrdiff_t wres = os_write(fp->fd, fp->read_pos, (size_t)to_write,
fp->non_blocking);
if (wres != (ptrdiff_t)read_bytes) {
if (wres >= 0) {
fp->_error = UV_EIO;
} else {
fp->_error = (int)wres;
}
fp->read_pos = fp->write_pos = fp->buffer;
if (wres != to_write) {
return (wres >= 0) ? UV_EIO : (int)wres;
}
return 0;
}
/// Read from file
@ -262,77 +235,78 @@ ptrdiff_t file_read(FileDescriptor *const fp, char *const ret_buf, const size_t
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT
{
assert(!fp->wr);
char *buf = ret_buf;
size_t read_remaining = size;
RBuffer *const rv = fp->rv;
size_t from_buffer = MIN((size_t)(fp->write_pos - fp->read_pos), size);
memcpy(ret_buf, fp->read_pos, from_buffer);
char *buf = ret_buf + from_buffer;
size_t read_remaining = size - from_buffer;
if (!read_remaining) {
fp->bytes_read += from_buffer;
fp->read_pos += from_buffer;
return (ptrdiff_t)from_buffer;
}
// at this point, we have consumed all of an existing buffer. restart from the beginning
fp->read_pos = fp->write_pos = fp->buffer;
#ifdef HAVE_READV
bool called_read = false;
while (read_remaining) {
const size_t rv_size = rbuffer_size(rv);
if (rv_size > 0) {
const size_t rsize = rbuffer_read(rv, buf, MIN(rv_size, read_remaining));
buf += rsize;
read_remaining -= rsize;
}
if (fp->eof
// Allow only at most one os_read[v] call.
|| (called_read && fp->non_blocking)) {
// Allow only at most one os_read[v] call.
if (fp->eof || (called_read && fp->non_blocking)) {
break;
}
if (read_remaining) {
assert(rbuffer_size(rv) == 0);
rbuffer_reset(rv);
#ifdef HAVE_READV
// If there is readv() syscall, then take an opportunity to populate
// both target buffer and RBuffer at once, …
size_t write_count;
struct iovec iov[] = {
{ .iov_base = buf, .iov_len = read_remaining },
{ .iov_base = rbuffer_write_ptr(rv, &write_count),
.iov_len = kRWBufferSize },
};
assert(write_count == kRWBufferSize);
const ptrdiff_t r_ret = os_readv(fp->fd, &fp->eof, iov,
ARRAY_SIZE(iov), fp->non_blocking);
if (r_ret > 0) {
if (r_ret > (ptrdiff_t)read_remaining) {
rbuffer_produced(rv, (size_t)(r_ret - (ptrdiff_t)read_remaining));
read_remaining = 0;
} else {
buf += (size_t)r_ret;
read_remaining -= (size_t)r_ret;
}
} else if (r_ret < 0) {
return r_ret;
}
#else
if (read_remaining >= kRWBufferSize) {
// …otherwise leave RBuffer empty and populate only target buffer,
// because filtering information through rbuffer will be more syscalls.
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, buf, read_remaining,
fp->non_blocking);
if (r_ret >= 0) {
read_remaining -= (size_t)r_ret;
fp->bytes_read += (size - read_remaining);
return (ptrdiff_t)(size - read_remaining);
} else if (r_ret < 0) {
return r_ret;
}
// If there is readv() syscall, then take an opportunity to populate
// both target buffer and RBuffer at once, …
struct iovec iov[] = {
{ .iov_base = buf, .iov_len = read_remaining },
{ .iov_base = fp->write_pos,
.iov_len = ARENA_BLOCK_SIZE },
};
const ptrdiff_t r_ret = os_readv(fp->fd, &fp->eof, iov,
ARRAY_SIZE(iov), fp->non_blocking);
if (r_ret > 0) {
if (r_ret > (ptrdiff_t)read_remaining) {
fp->write_pos += (size_t)(r_ret - (ptrdiff_t)read_remaining);
read_remaining = 0;
} else {
size_t write_count;
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof,
rbuffer_write_ptr(rv, &write_count),
kRWBufferSize, fp->non_blocking);
assert(write_count == kRWBufferSize);
if (r_ret > 0) {
rbuffer_produced(rv, (size_t)r_ret);
} else if (r_ret < 0) {
return r_ret;
}
buf += r_ret;
read_remaining -= (size_t)r_ret;
}
#endif
called_read = true;
} else if (r_ret < 0) {
return r_ret;
}
called_read = true;
}
#else
if (fp->eof) {
// already eof, cannot read more
} else if (read_remaining >= ARENA_BLOCK_SIZE) {
// …otherwise leave fp->buffer empty and populate only target buffer,
// because filtering information through rbuffer will be more syscalls.
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, buf, read_remaining,
fp->non_blocking);
if (r_ret >= 0) {
read_remaining -= (size_t)r_ret;
} else if (r_ret < 0) {
return r_ret;
}
} else {
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof,
fp->write_pos,
ARENA_BLOCK_SIZE, fp->non_blocking);
if (r_ret < 0) {
return r_ret;
} else {
fp->write_pos += r_ret;
size_t to_copy = MIN((size_t)r_ret, read_remaining);
memcpy(ret_buf, fp->read_pos, to_copy);
fp->read_pos += to_copy;
read_remaining -= to_copy;
}
}
#endif
fp->bytes_read += (size - read_remaining);
return (ptrdiff_t)(size - read_remaining);
}
@ -348,40 +322,68 @@ ptrdiff_t file_write(FileDescriptor *const fp, const char *const buf, const size
FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_ARG(1)
{
assert(fp->wr);
const size_t written = rbuffer_write(fp->rv, buf, size);
if (fp->_error != 0) {
const int error = fp->_error;
fp->_error = 0;
return error;
} else if (written != size) {
return UV_EIO;
ptrdiff_t space = (fp->buffer + ARENA_BLOCK_SIZE) - fp->write_pos;
// includes the trivial case of size==0
if (size < (size_t)space) {
memcpy(fp->write_pos, buf, size);
fp->write_pos += size;
return (ptrdiff_t)size;
}
return (ptrdiff_t)written;
}
/// Buffer used for skipping. Its contents is undefined and should never be
/// used.
static char skipbuf[kRWBufferSize];
// TODO(bfredl): just as for reading, use iovec to combine fp->buffer with buf
int status = file_flush(fp);
if (status < 0) {
return status;
}
if (size < ARENA_BLOCK_SIZE) {
memcpy(fp->write_pos, buf, size);
fp->write_pos += size;
return (ptrdiff_t)size;
}
const ptrdiff_t wres = os_write(fp->fd, buf, size,
fp->non_blocking);
return (wres != (ptrdiff_t)size && wres >= 0) ? UV_EIO : wres;
}
/// Skip some bytes
///
/// This is like `fseek(fp, size, SEEK_CUR)`, but actual implementation simply
/// reads to a buffer and discards the result.
/// reads to the buffer and discards the result.
ptrdiff_t file_skip(FileDescriptor *const fp, const size_t size)
FUNC_ATTR_NONNULL_ALL
{
assert(!fp->wr);
size_t read_bytes = 0;
do {
const ptrdiff_t new_read_bytes =
file_read(fp, skipbuf, MIN(size - read_bytes, sizeof(skipbuf)));
if (new_read_bytes < 0) {
return new_read_bytes;
} else if (new_read_bytes == 0) {
size_t from_buffer = MIN((size_t)(fp->write_pos - fp->read_pos), size);
size_t skip_remaining = size - from_buffer;
if (skip_remaining == 0) {
fp->read_pos += from_buffer;
fp->bytes_read += from_buffer;
return (ptrdiff_t)from_buffer;
}
fp->read_pos = fp->write_pos = fp->buffer;
bool called_read = false;
while (skip_remaining > 0) {
// Allow only at most one os_read[v] call.
if (fp->eof || (called_read && fp->non_blocking)) {
break;
}
read_bytes += (size_t)new_read_bytes;
} while (read_bytes < size && !file_eof(fp));
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, fp->buffer, ARENA_BLOCK_SIZE,
fp->non_blocking);
if (r_ret < 0) {
return r_ret;
} else if ((size_t)r_ret > skip_remaining) {
fp->read_pos = fp->buffer + skip_remaining;
fp->write_pos = fp->buffer + r_ret;
fp->bytes_read += size;
return (ptrdiff_t)size;
}
skip_remaining -= (size_t)r_ret;
called_read = true;
}
return (ptrdiff_t)read_bytes;
fp->bytes_read += size - skip_remaining;
return (ptrdiff_t)(size - skip_remaining);
}

View File

@ -8,9 +8,10 @@
/// Structure used to read from/write to file
typedef struct {
int fd; ///< File descriptor.
int _error; ///< Error code for use with RBuffer callbacks or zero.
RBuffer *rv; ///< Read or write buffer.
int fd; ///< File descriptor. Can be -1 if no backing file (file_open_buffer)
char *buffer; ///< Read or write buffer. always ARENA_BLOCK_SIZE if allocated
char *read_pos; ///< read position in buffer
char *write_pos; ///< write position in buffer
bool wr; ///< True if file is in write mode.
bool eof; ///< True if end of file was encountered.
bool non_blocking; ///< True if EAGAIN should not restart syscalls.
@ -28,7 +29,7 @@ static inline bool file_eof(const FileDescriptor *fp)
/// performed.
static inline bool file_eof(const FileDescriptor *const fp)
{
return fp->eof && rbuffer_size(fp->rv) == 0;
return fp->eof && fp->read_pos == fp->write_pos;
}
static inline int file_fd(const FileDescriptor *fp)

View File

@ -29,23 +29,6 @@ RBuffer *rbuffer_new(size_t capacity)
return rv;
}
/// Creates a new `RBuffer` instance for reading from a buffer.
///
/// Must not be used with any write function like rbuffer_write_ptr or rbuffer_produced!
RBuffer *rbuffer_new_wrap_buf(char *data, size_t len)
FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET
{
RBuffer *rv = xcalloc(1, sizeof(RBuffer));
rv->full_cb = rv->nonfull_cb = NULL;
rv->data = NULL;
rv->size = len;
rv->read_ptr = data;
rv->write_ptr = data + len;
rv->end_ptr = NULL;
rv->temp = NULL;
return rv;
}
void rbuffer_free(RBuffer *buf) FUNC_ATTR_NONNULL_ALL
{
xfree(buf->temp);
@ -146,7 +129,7 @@ void rbuffer_consumed(RBuffer *buf, size_t count)
assert(count <= buf->size);
buf->read_ptr += count;
if (buf->end_ptr && buf->read_ptr >= buf->end_ptr) {
if (buf->read_ptr >= buf->end_ptr) {
buf->read_ptr -= rbuffer_capacity(buf);
}