Create EventType for RStream reading

RStream will be the main way Neovim receives asynchronous messages, so it is
best to have a specialized EventType for it. A new flag parameter was added to
`rstream_new` which tells the RStream instance to defer event handling for later
with KE_EVENT instead of handling it directly from libuv callback.
This commit is contained in:
Thiago de Arruda 2014-04-16 21:17:22 -03:00
parent c40428c934
commit 350144f511
7 changed files with 64 additions and 14 deletions

View File

@ -8,6 +8,7 @@
#include "os/event.h"
#include "os/input.h"
#include "os/signal.h"
#include "os/rstream.h"
#include "os/job.h"
#include "vim.h"
#include "memory.h"
@ -112,6 +113,9 @@ void event_process()
case kEventJobActivity:
job_handle(event);
break;
case kEventRStreamData:
rstream_read_event(event);
break;
default:
abort();
}

View File

@ -6,13 +6,18 @@
typedef enum {
kEventSignal,
kEventJobActivity
kEventJobActivity,
kEventRStreamData
} EventType;
typedef struct {
EventType type;
union {
int signum;
struct {
RStream *ptr;
bool eof;
} rstream;
struct {
Job *ptr;
RStream *target;

View File

@ -34,7 +34,7 @@ static int push_event_key(uint8_t *buf, int maxlen);
void input_init()
{
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL);
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false);
rstream_set_file(read_stream, read_cmd_fd);
}

View File

@ -159,8 +159,8 @@ int job_start(char **argv, void *data, job_read_cb cb)
}
// Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, false);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);

View File

@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "os/event_defs.h"
#include "os/event.h"
/// Initializes job control resources

View File

@ -6,6 +6,8 @@
#include "os/rstream_defs.h"
#include "os/rstream.h"
#include "os/event_defs.h"
#include "os/event.h"
#include "vim.h"
#include "memory.h"
@ -19,20 +21,25 @@ struct rstream {
uv_file fd;
rstream_cb cb;
uint32_t buffer_size, rpos, wpos, fpos;
bool reading, free_handle;
bool reading, free_handle, async;
};
// Callbacks used by libuv
static void alloc_cb(uv_handle_t *, size_t, uv_buf_t *);
static void read_cb(uv_stream_t *, ssize_t, const uv_buf_t *);
static void fread_idle_cb(uv_idle_t *);
static void emit_read_event(RStream *rstream, bool eof);
RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data)
RStream * rstream_new(rstream_cb cb,
uint32_t buffer_size,
void *data,
bool async)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
rv->async = async;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
@ -162,6 +169,13 @@ uint32_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
void rstream_read_event(Event event)
{
RStream *rstream = event.data.rstream.ptr;
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
@ -191,7 +205,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
rstream->cb(rstream, rstream->data, true);
emit_read_event(rstream, true);
}
return;
}
@ -205,11 +219,8 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
rstream_stop(rstream);
}
// Invoke the callback passing in the number of bytes available and data
// associated with the stream
rstream->cb(rstream, rstream->data, false);
rstream->reading = false;
emit_read_event(rstream, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@ -235,7 +246,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
rstream->cb(rstream, rstream->data, true);
emit_read_event(rstream, true);
return;
}
@ -247,5 +258,21 @@ static void fread_idle_cb(uv_idle_t *handle)
rstream_stop(rstream);
}
rstream->cb(rstream, rstream->data, false);
emit_read_event(rstream, false);
}
static void emit_read_event(RStream *rstream, bool eof)
{
if (rstream->async) {
Event event;
event.type = kEventRStreamData;
event.data.rstream.ptr = rstream;
event.data.rstream.eof = eof;
event_push(event);
} else {
// Invoke the callback passing in the number of bytes available and data
// associated with the stream
rstream->cb(rstream, rstream->data, eof);
}
}

View File

@ -5,6 +5,7 @@
#include <stdint.h>
#include <uv.h>
#include "os/event_defs.h"
#include "os/rstream_defs.h"
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
@ -14,8 +15,15 @@
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
/// @param async Flag that specifies if the callback should only be called
/// outside libuv event loop(When processing async events with
/// KE_EVENT). Only the RStream instance reading user input should set
/// this to false
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb, uint32_t buffer_size, void *data);
RStream * rstream_new(rstream_cb cb,
uint32_t buffer_size,
void *data,
bool async);
/// Frees all memory allocated for a RStream instance
///
@ -71,5 +79,10 @@ uint32_t rstream_read(RStream *rstream, char *buffer, uint32_t count);
/// @return The number of bytes available
uint32_t rstream_available(RStream *rstream);
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
void rstream_read_event(Event event);
#endif // NEOVIM_OS_RSTREAM_H