Deal with backpressure on RStream instances

Each RStream instance will now stop its libuv watcher when the buffer is full,
and automatically restart when some data is read with `rstream_read`.
This commit is contained in:
Thiago de Arruda 2014-04-16 20:30:00 -03:00
parent 9d16a6b370
commit c40428c934

View File

@ -147,6 +147,11 @@ uint32_t rstream_read(RStream *rstream, char *buf, uint32_t count)
rstream->wpos - rstream->rpos); // ...By the number of unread bytes rstream->wpos - rstream->rpos); // ...By the number of unread bytes
rstream->wpos -= rstream->rpos; rstream->wpos -= rstream->rpos;
rstream->rpos = 0; rstream->rpos = 0;
if (rstream->wpos < rstream->buffer_size) {
// Restart reading since we have freed some space
rstream_start(rstream);
}
} }
return read_count; return read_count;
@ -167,8 +172,9 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
return; return;
} }
buf->base = rstream->buffer + rstream->wpos;
buf->len = rstream->buffer_size - rstream->wpos; buf->len = rstream->buffer_size - rstream->wpos;
buf->base = rstream->buffer + rstream->wpos;
// Avoid `alloc_cb`, `alloc_cb` sequences on windows // Avoid `alloc_cb`, `alloc_cb` sequences on windows
rstream->reading = true; rstream->reading = true;
} }
@ -193,10 +199,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect // Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer. // the space actually used in the buffer.
rstream->wpos += cnt; rstream->wpos += cnt;
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
}
// Invoke the callback passing in the number of bytes available and data // Invoke the callback passing in the number of bytes available and data
// associated with the stream // associated with the stream
rstream->cb(rstream, rstream->data, false); rstream->cb(rstream, rstream->data, false);
rstream->reading = false; rstream->reading = false;
} }
// Called by the by the 'idle' handle to emulate a reading event // Called by the by the 'idle' handle to emulate a reading event
@ -228,5 +241,11 @@ static void fread_idle_cb(uv_idle_t *handle)
rstream->wpos += req.result; rstream->wpos += req.result;
rstream->fpos += req.result; rstream->fpos += req.result;
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
}
rstream->cb(rstream, rstream->data, false); rstream->cb(rstream, rstream->data, false);
} }