mirror of
https://github.com/neovim/neovim.git
synced 2025-02-25 18:55:25 -06:00
refactor(fileio): use a linear buffer for FileDescriptor
Using a ring buffer for buffered synchronous fileio is just unnecessary complexity. - when reading, we always consume the _entire_ buffer before getting into syscalls. Thus we reset the buffer to its initial position before when we actually read. - when writing and buffer is full, we always flush the entire buffer before starting to buffer again. So we can reset the buffer to its initial state. Also no static buffers are needed for writing and skipping. Needing an extra copy for each write completely defeated the purpose of a ring buffer (if there had been one)
This commit is contained in:
parent
f55767afe2
commit
064483a2b4
@ -113,7 +113,6 @@ void mpack_handle(ObjectType type, handle_T handle, PackerBuffer *packer)
|
||||
mpack_w(&packer->ptr, 0xc7);
|
||||
mpack_w(&packer->ptr, packsize);
|
||||
mpack_w(&packer->ptr, exttype);
|
||||
// check_buffer(packer);
|
||||
memcpy(packer->ptr, buf, (size_t)packsize);
|
||||
packer->ptr += packsize;
|
||||
}
|
||||
|
@ -120,12 +120,9 @@ int file_open_fd(FileDescriptor *const ret_fp, const int fd, const int flags)
|
||||
assert(!ret_fp->wr || !ret_fp->non_blocking);
|
||||
ret_fp->fd = fd;
|
||||
ret_fp->eof = false;
|
||||
ret_fp->rv = rbuffer_new(kRWBufferSize);
|
||||
ret_fp->_error = 0;
|
||||
if (ret_fp->wr) {
|
||||
ret_fp->rv->data = ret_fp;
|
||||
ret_fp->rv->full_cb = (rbuffer_callback)&file_rb_write_full_cb;
|
||||
}
|
||||
ret_fp->buffer = alloc_block();
|
||||
ret_fp->read_pos = ret_fp->buffer;
|
||||
ret_fp->write_pos = ret_fp->buffer;
|
||||
ret_fp->bytes_read = 0;
|
||||
return 0;
|
||||
}
|
||||
@ -148,8 +145,9 @@ void file_open_buffer(FileDescriptor *ret_fp, char *data, size_t len)
|
||||
ret_fp->non_blocking = false;
|
||||
ret_fp->fd = -1;
|
||||
ret_fp->eof = true;
|
||||
ret_fp->rv = rbuffer_new_wrap_buf(data, len);
|
||||
ret_fp->_error = 0;
|
||||
ret_fp->buffer = NULL; // we don't take ownership
|
||||
ret_fp->read_pos = data;
|
||||
ret_fp->write_pos = data + len;
|
||||
ret_fp->bytes_read = 0;
|
||||
}
|
||||
|
||||
@ -163,36 +161,18 @@ int file_close(FileDescriptor *const fp, const bool do_fsync)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (fp->fd < 0) {
|
||||
rbuffer_free(fp->rv);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const int flush_error = (do_fsync ? file_fsync(fp) : file_flush(fp));
|
||||
const int close_error = os_close(fp->fd);
|
||||
rbuffer_free(fp->rv);
|
||||
free_block(fp->buffer);
|
||||
if (close_error != 0) {
|
||||
return close_error;
|
||||
}
|
||||
return flush_error;
|
||||
}
|
||||
|
||||
/// Flush file modifications to disk
|
||||
///
|
||||
/// @param[in,out] fp File to work with.
|
||||
///
|
||||
/// @return 0 or error code.
|
||||
int file_flush(FileDescriptor *const fp)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (!fp->wr) {
|
||||
return 0;
|
||||
}
|
||||
file_rb_write_full_cb(fp->rv, fp);
|
||||
const int error = fp->_error;
|
||||
fp->_error = 0;
|
||||
return error;
|
||||
}
|
||||
|
||||
/// Flush file modifications to disk and run fsync()
|
||||
///
|
||||
/// @param[in,out] fp File to work with.
|
||||
@ -218,36 +198,29 @@ int file_fsync(FileDescriptor *const fp)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Buffer used for writing
|
||||
/// Flush file modifications to disk
|
||||
///
|
||||
/// Like IObuff, but allows file_\* callers not to care about spoiling it.
|
||||
static char writebuf[kRWBufferSize];
|
||||
|
||||
/// Function run when RBuffer is full when writing to a file
|
||||
///
|
||||
/// Actually does writing to the file, may also be invoked directly.
|
||||
///
|
||||
/// @param[in,out] rv RBuffer instance used.
|
||||
/// @param[in,out] fp File to work with.
|
||||
static void file_rb_write_full_cb(RBuffer *const rv, void *const fp_in)
|
||||
///
|
||||
/// @return 0 or error code.
|
||||
int file_flush(FileDescriptor *fp)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
FileDescriptor *const fp = fp_in;
|
||||
assert(fp->wr);
|
||||
assert(rv->data == (void *)fp);
|
||||
if (rbuffer_size(rv) == 0) {
|
||||
return;
|
||||
if (!fp->wr) {
|
||||
return 0;
|
||||
}
|
||||
const size_t read_bytes = rbuffer_read(rv, writebuf, kRWBufferSize);
|
||||
const ptrdiff_t wres = os_write(fp->fd, writebuf, read_bytes,
|
||||
|
||||
ptrdiff_t to_write = fp->write_pos - fp->read_pos;
|
||||
if (to_write == 0) {
|
||||
return 0;
|
||||
}
|
||||
const ptrdiff_t wres = os_write(fp->fd, fp->read_pos, (size_t)to_write,
|
||||
fp->non_blocking);
|
||||
if (wres != (ptrdiff_t)read_bytes) {
|
||||
if (wres >= 0) {
|
||||
fp->_error = UV_EIO;
|
||||
} else {
|
||||
fp->_error = (int)wres;
|
||||
}
|
||||
fp->read_pos = fp->write_pos = fp->buffer;
|
||||
if (wres != to_write) {
|
||||
return (wres >= 0) ? UV_EIO : (int)wres;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Read from file
|
||||
@ -262,77 +235,78 @@ ptrdiff_t file_read(FileDescriptor *const fp, char *const ret_buf, const size_t
|
||||
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT
|
||||
{
|
||||
assert(!fp->wr);
|
||||
char *buf = ret_buf;
|
||||
size_t read_remaining = size;
|
||||
RBuffer *const rv = fp->rv;
|
||||
size_t from_buffer = MIN((size_t)(fp->write_pos - fp->read_pos), size);
|
||||
memcpy(ret_buf, fp->read_pos, from_buffer);
|
||||
|
||||
char *buf = ret_buf + from_buffer;
|
||||
size_t read_remaining = size - from_buffer;
|
||||
if (!read_remaining) {
|
||||
fp->bytes_read += from_buffer;
|
||||
fp->read_pos += from_buffer;
|
||||
return (ptrdiff_t)from_buffer;
|
||||
}
|
||||
|
||||
// at this point, we have consumed all of an existing buffer. restart from the beginning
|
||||
fp->read_pos = fp->write_pos = fp->buffer;
|
||||
|
||||
#ifdef HAVE_READV
|
||||
bool called_read = false;
|
||||
while (read_remaining) {
|
||||
const size_t rv_size = rbuffer_size(rv);
|
||||
if (rv_size > 0) {
|
||||
const size_t rsize = rbuffer_read(rv, buf, MIN(rv_size, read_remaining));
|
||||
buf += rsize;
|
||||
read_remaining -= rsize;
|
||||
}
|
||||
if (fp->eof
|
||||
// Allow only at most one os_read[v] call.
|
||||
|| (called_read && fp->non_blocking)) {
|
||||
// Allow only at most one os_read[v] call.
|
||||
if (fp->eof || (called_read && fp->non_blocking)) {
|
||||
break;
|
||||
}
|
||||
if (read_remaining) {
|
||||
assert(rbuffer_size(rv) == 0);
|
||||
rbuffer_reset(rv);
|
||||
#ifdef HAVE_READV
|
||||
// If there is readv() syscall, then take an opportunity to populate
|
||||
// both target buffer and RBuffer at once, …
|
||||
size_t write_count;
|
||||
struct iovec iov[] = {
|
||||
{ .iov_base = buf, .iov_len = read_remaining },
|
||||
{ .iov_base = rbuffer_write_ptr(rv, &write_count),
|
||||
.iov_len = kRWBufferSize },
|
||||
};
|
||||
assert(write_count == kRWBufferSize);
|
||||
const ptrdiff_t r_ret = os_readv(fp->fd, &fp->eof, iov,
|
||||
ARRAY_SIZE(iov), fp->non_blocking);
|
||||
if (r_ret > 0) {
|
||||
if (r_ret > (ptrdiff_t)read_remaining) {
|
||||
rbuffer_produced(rv, (size_t)(r_ret - (ptrdiff_t)read_remaining));
|
||||
read_remaining = 0;
|
||||
} else {
|
||||
buf += (size_t)r_ret;
|
||||
read_remaining -= (size_t)r_ret;
|
||||
}
|
||||
} else if (r_ret < 0) {
|
||||
return r_ret;
|
||||
}
|
||||
#else
|
||||
if (read_remaining >= kRWBufferSize) {
|
||||
// …otherwise leave RBuffer empty and populate only target buffer,
|
||||
// because filtering information through rbuffer will be more syscalls.
|
||||
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, buf, read_remaining,
|
||||
fp->non_blocking);
|
||||
if (r_ret >= 0) {
|
||||
read_remaining -= (size_t)r_ret;
|
||||
fp->bytes_read += (size - read_remaining);
|
||||
return (ptrdiff_t)(size - read_remaining);
|
||||
} else if (r_ret < 0) {
|
||||
return r_ret;
|
||||
}
|
||||
// If there is readv() syscall, then take an opportunity to populate
|
||||
// both target buffer and RBuffer at once, …
|
||||
struct iovec iov[] = {
|
||||
{ .iov_base = buf, .iov_len = read_remaining },
|
||||
{ .iov_base = fp->write_pos,
|
||||
.iov_len = ARENA_BLOCK_SIZE },
|
||||
};
|
||||
const ptrdiff_t r_ret = os_readv(fp->fd, &fp->eof, iov,
|
||||
ARRAY_SIZE(iov), fp->non_blocking);
|
||||
if (r_ret > 0) {
|
||||
if (r_ret > (ptrdiff_t)read_remaining) {
|
||||
fp->write_pos += (size_t)(r_ret - (ptrdiff_t)read_remaining);
|
||||
read_remaining = 0;
|
||||
} else {
|
||||
size_t write_count;
|
||||
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof,
|
||||
rbuffer_write_ptr(rv, &write_count),
|
||||
kRWBufferSize, fp->non_blocking);
|
||||
assert(write_count == kRWBufferSize);
|
||||
if (r_ret > 0) {
|
||||
rbuffer_produced(rv, (size_t)r_ret);
|
||||
} else if (r_ret < 0) {
|
||||
return r_ret;
|
||||
}
|
||||
buf += r_ret;
|
||||
read_remaining -= (size_t)r_ret;
|
||||
}
|
||||
#endif
|
||||
called_read = true;
|
||||
} else if (r_ret < 0) {
|
||||
return r_ret;
|
||||
}
|
||||
called_read = true;
|
||||
}
|
||||
#else
|
||||
if (fp->eof) {
|
||||
// already eof, cannot read more
|
||||
} else if (read_remaining >= ARENA_BLOCK_SIZE) {
|
||||
// …otherwise leave fp->buffer empty and populate only target buffer,
|
||||
// because filtering information through rbuffer will be more syscalls.
|
||||
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, buf, read_remaining,
|
||||
fp->non_blocking);
|
||||
if (r_ret >= 0) {
|
||||
read_remaining -= (size_t)r_ret;
|
||||
} else if (r_ret < 0) {
|
||||
return r_ret;
|
||||
}
|
||||
} else {
|
||||
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof,
|
||||
fp->write_pos,
|
||||
ARENA_BLOCK_SIZE, fp->non_blocking);
|
||||
if (r_ret < 0) {
|
||||
return r_ret;
|
||||
} else {
|
||||
fp->write_pos += r_ret;
|
||||
size_t to_copy = MIN((size_t)r_ret, read_remaining);
|
||||
memcpy(ret_buf, fp->read_pos, to_copy);
|
||||
fp->read_pos += to_copy;
|
||||
read_remaining -= to_copy;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
fp->bytes_read += (size - read_remaining);
|
||||
return (ptrdiff_t)(size - read_remaining);
|
||||
}
|
||||
@ -348,40 +322,68 @@ ptrdiff_t file_write(FileDescriptor *const fp, const char *const buf, const size
|
||||
FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
assert(fp->wr);
|
||||
const size_t written = rbuffer_write(fp->rv, buf, size);
|
||||
if (fp->_error != 0) {
|
||||
const int error = fp->_error;
|
||||
fp->_error = 0;
|
||||
return error;
|
||||
} else if (written != size) {
|
||||
return UV_EIO;
|
||||
ptrdiff_t space = (fp->buffer + ARENA_BLOCK_SIZE) - fp->write_pos;
|
||||
// includes the trivial case of size==0
|
||||
if (size < (size_t)space) {
|
||||
memcpy(fp->write_pos, buf, size);
|
||||
fp->write_pos += size;
|
||||
return (ptrdiff_t)size;
|
||||
}
|
||||
return (ptrdiff_t)written;
|
||||
}
|
||||
|
||||
/// Buffer used for skipping. Its contents is undefined and should never be
|
||||
/// used.
|
||||
static char skipbuf[kRWBufferSize];
|
||||
// TODO(bfredl): just as for reading, use iovec to combine fp->buffer with buf
|
||||
int status = file_flush(fp);
|
||||
if (status < 0) {
|
||||
return status;
|
||||
}
|
||||
|
||||
if (size < ARENA_BLOCK_SIZE) {
|
||||
memcpy(fp->write_pos, buf, size);
|
||||
fp->write_pos += size;
|
||||
return (ptrdiff_t)size;
|
||||
}
|
||||
|
||||
const ptrdiff_t wres = os_write(fp->fd, buf, size,
|
||||
fp->non_blocking);
|
||||
return (wres != (ptrdiff_t)size && wres >= 0) ? UV_EIO : wres;
|
||||
}
|
||||
|
||||
/// Skip some bytes
|
||||
///
|
||||
/// This is like `fseek(fp, size, SEEK_CUR)`, but actual implementation simply
|
||||
/// reads to a buffer and discards the result.
|
||||
/// reads to the buffer and discards the result.
|
||||
ptrdiff_t file_skip(FileDescriptor *const fp, const size_t size)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
assert(!fp->wr);
|
||||
size_t read_bytes = 0;
|
||||
do {
|
||||
const ptrdiff_t new_read_bytes =
|
||||
file_read(fp, skipbuf, MIN(size - read_bytes, sizeof(skipbuf)));
|
||||
if (new_read_bytes < 0) {
|
||||
return new_read_bytes;
|
||||
} else if (new_read_bytes == 0) {
|
||||
size_t from_buffer = MIN((size_t)(fp->write_pos - fp->read_pos), size);
|
||||
size_t skip_remaining = size - from_buffer;
|
||||
if (skip_remaining == 0) {
|
||||
fp->read_pos += from_buffer;
|
||||
fp->bytes_read += from_buffer;
|
||||
return (ptrdiff_t)from_buffer;
|
||||
}
|
||||
|
||||
fp->read_pos = fp->write_pos = fp->buffer;
|
||||
bool called_read = false;
|
||||
while (skip_remaining > 0) {
|
||||
// Allow only at most one os_read[v] call.
|
||||
if (fp->eof || (called_read && fp->non_blocking)) {
|
||||
break;
|
||||
}
|
||||
read_bytes += (size_t)new_read_bytes;
|
||||
} while (read_bytes < size && !file_eof(fp));
|
||||
const ptrdiff_t r_ret = os_read(fp->fd, &fp->eof, fp->buffer, ARENA_BLOCK_SIZE,
|
||||
fp->non_blocking);
|
||||
if (r_ret < 0) {
|
||||
return r_ret;
|
||||
} else if ((size_t)r_ret > skip_remaining) {
|
||||
fp->read_pos = fp->buffer + skip_remaining;
|
||||
fp->write_pos = fp->buffer + r_ret;
|
||||
fp->bytes_read += size;
|
||||
return (ptrdiff_t)size;
|
||||
}
|
||||
skip_remaining -= (size_t)r_ret;
|
||||
called_read = true;
|
||||
}
|
||||
|
||||
return (ptrdiff_t)read_bytes;
|
||||
fp->bytes_read += size - skip_remaining;
|
||||
return (ptrdiff_t)(size - skip_remaining);
|
||||
}
|
||||
|
@ -8,9 +8,10 @@
|
||||
|
||||
/// Structure used to read from/write to file
|
||||
typedef struct {
|
||||
int fd; ///< File descriptor.
|
||||
int _error; ///< Error code for use with RBuffer callbacks or zero.
|
||||
RBuffer *rv; ///< Read or write buffer.
|
||||
int fd; ///< File descriptor. Can be -1 if no backing file (file_open_buffer)
|
||||
char *buffer; ///< Read or write buffer. always ARENA_BLOCK_SIZE if allocated
|
||||
char *read_pos; ///< read position in buffer
|
||||
char *write_pos; ///< write position in buffer
|
||||
bool wr; ///< True if file is in write mode.
|
||||
bool eof; ///< True if end of file was encountered.
|
||||
bool non_blocking; ///< True if EAGAIN should not restart syscalls.
|
||||
@ -28,7 +29,7 @@ static inline bool file_eof(const FileDescriptor *fp)
|
||||
/// performed.
|
||||
static inline bool file_eof(const FileDescriptor *const fp)
|
||||
{
|
||||
return fp->eof && rbuffer_size(fp->rv) == 0;
|
||||
return fp->eof && fp->read_pos == fp->write_pos;
|
||||
}
|
||||
|
||||
static inline int file_fd(const FileDescriptor *fp)
|
||||
|
@ -29,23 +29,6 @@ RBuffer *rbuffer_new(size_t capacity)
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// Creates a new `RBuffer` instance for reading from a buffer.
|
||||
///
|
||||
/// Must not be used with any write function like rbuffer_write_ptr or rbuffer_produced!
|
||||
RBuffer *rbuffer_new_wrap_buf(char *data, size_t len)
|
||||
FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET
|
||||
{
|
||||
RBuffer *rv = xcalloc(1, sizeof(RBuffer));
|
||||
rv->full_cb = rv->nonfull_cb = NULL;
|
||||
rv->data = NULL;
|
||||
rv->size = len;
|
||||
rv->read_ptr = data;
|
||||
rv->write_ptr = data + len;
|
||||
rv->end_ptr = NULL;
|
||||
rv->temp = NULL;
|
||||
return rv;
|
||||
}
|
||||
|
||||
void rbuffer_free(RBuffer *buf) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
xfree(buf->temp);
|
||||
@ -146,7 +129,7 @@ void rbuffer_consumed(RBuffer *buf, size_t count)
|
||||
assert(count <= buf->size);
|
||||
|
||||
buf->read_ptr += count;
|
||||
if (buf->end_ptr && buf->read_ptr >= buf->end_ptr) {
|
||||
if (buf->read_ptr >= buf->end_ptr) {
|
||||
buf->read_ptr -= rbuffer_capacity(buf);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user