channels: more consistent event handling

terminal: libvterm now receives data in async context. This was "almost" safe
already, as redraws were queued anyway.
This commit is contained in:
Björn Linse 2017-08-27 12:42:26 +02:00
parent f629f8312d
commit fee367a74f
2 changed files with 22 additions and 28 deletions

View File

@ -21,7 +21,7 @@ static uint64_t next_chan_id = CHAN_STDERR+1;
typedef struct { typedef struct {
Channel *data; Channel *chan;
Callback *callback; Callback *callback;
const char *type; const char *type;
list_T *received; list_T *received;
@ -348,9 +348,6 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
has_err = callback_reader_set(chan->on_stderr); has_err = callback_reader_set(chan->on_stderr);
} }
int status = process_spawn(proc, true, has_out, has_err); int status = process_spawn(proc, true, has_out, has_err);
if (has_err) {
proc->err.events = chan->events;
}
if (status) { if (status) {
EMSG3(_(e_jobspawn), os_strerror(status), cmd); EMSG3(_(e_jobspawn), os_strerror(status), cmd);
xfree(cmd); xfree(cmd);
@ -372,10 +369,8 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout,
// the rpc takes over the in and out streams // the rpc takes over the in and out streams
rpc_start(chan); rpc_start(chan);
} else { } else {
proc->in.events = chan->events;
if (has_out) { if (has_out) {
callback_reader_start(&chan->on_stdout); callback_reader_start(&chan->on_stdout);
proc->out.events = chan->events;
rstream_start(&proc->out, on_job_stdout, chan); rstream_start(&proc->out, on_job_stdout, chan);
} }
} }
@ -422,7 +417,6 @@ uint64_t channel_connect(bool tcp, const char *address,
} else { } else {
channel->on_stdout = on_output; channel->on_stdout = on_output;
callback_reader_start(&channel->on_stdout); callback_reader_start(&channel->on_stdout);
channel->stream.socket.events = channel->events;
rstream_start(&channel->stream.socket, on_socket_output, channel); rstream_start(&channel->stream.socket, on_socket_output, channel);
} }
@ -481,8 +475,6 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
} else { } else {
channel->on_stdout = on_output; channel->on_stdout = on_output;
callback_reader_start(&channel->on_stdout); callback_reader_start(&channel->on_stdout);
channel->stream.stdio.in.events = channel->events;
channel->stream.stdio.out.events = channel->events;
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
} }
@ -534,10 +526,11 @@ static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf, const char *type, char *buf,
size_t count, int status) size_t count, int status)
{ {
ChannelEvent event_data; assert(callback);
event_data.received = NULL; ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->received = NULL;
if (buf) { if (buf) {
event_data.received = tv_list_alloc(); event_data->received = tv_list_alloc();
char *ptr = buf; char *ptr = buf;
size_t remaining = count; size_t remaining = count;
size_t off = 0; size_t off = 0;
@ -545,7 +538,7 @@ static inline void process_channel_event(Channel *chan, Callback *callback,
while (off < remaining) { while (off < remaining) {
// append the line // append the line
if (ptr[off] == NL) { if (ptr[off] == NL) {
tv_list_append_string(event_data.received, ptr, (ssize_t)off); tv_list_append_string(event_data->received, ptr, (ssize_t)off);
size_t skip = off + 1; size_t skip = off + 1;
ptr += skip; ptr += skip;
remaining -= skip; remaining -= skip;
@ -558,14 +551,16 @@ static inline void process_channel_event(Channel *chan, Callback *callback,
} }
off++; off++;
} }
tv_list_append_string(event_data.received, ptr, (ssize_t)off); tv_list_append_string(event_data->received, ptr, (ssize_t)off);
} else { } else {
event_data.status = status; event_data->status = status;
} }
event_data.data = chan; channel_incref(chan); // Hold on ref to callback
event_data.callback = callback; event_data->chan = chan;
event_data.type = type; event_data->callback = callback;
on_channel_event(&event_data); event_data->type = type;
multiqueue_put(chan->events, on_channel_event, 1, event_data);
} }
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
@ -608,7 +603,7 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (eof) { if (eof) {
if (reader->buffered) { if (reader->buffered) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data, process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
(size_t)reader->buffer.ga_len, 0); (size_t)reader->buffer.ga_len, 0);
ga_clear(&reader->buffer); ga_clear(&reader->buffer);
} else if (callback_reader_set(*reader)) { } else if (callback_reader_set(*reader)) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0); process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
@ -648,17 +643,15 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
channel_decref(chan); channel_decref(chan);
} }
static void on_channel_event(ChannelEvent *ev) static void on_channel_event(void **args)
{ {
if (!ev->callback) { ChannelEvent *ev = (ChannelEvent *)args[0];
return;
}
typval_T argv[4]; typval_T argv[4];
argv[0].v_type = VAR_NUMBER; argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = VAR_UNLOCKED; argv[0].v_lock = VAR_UNLOCKED;
argv[0].vval.v_number = (varnumber_T)ev->data->id; argv[0].vval.v_number = (varnumber_T)ev->chan->id;
if (ev->received) { if (ev->received) {
argv[1].v_type = VAR_LIST; argv[1].v_type = VAR_LIST;
@ -678,6 +671,8 @@ static void on_channel_event(ChannelEvent *ev)
typval_T rettv = TV_INITIAL_VALUE; typval_T rettv = TV_INITIAL_VALUE;
callback_call(ev->callback, 3, argv, &rettv); callback_call(ev->callback, 3, argv, &rettv);
tv_clear(&rettv); tv_clear(&rettv);
channel_decref(ev->chan);
xfree(ev);
} }

View File

@ -1094,11 +1094,12 @@ static void refresh_terminal(Terminal *term)
// Calls refresh_terminal() on all invalidated_terminals. // Calls refresh_terminal() on all invalidated_terminals.
static void refresh_timer_cb(TimeWatcher *watcher, void *data) static void refresh_timer_cb(TimeWatcher *watcher, void *data)
{ {
refresh_pending = false;
if (exiting // Cannot redraw (requires event loop) during teardown/exit. if (exiting // Cannot redraw (requires event loop) during teardown/exit.
// WM_LIST (^D) is not redrawn, unlike the normal wildmenu. So we must // WM_LIST (^D) is not redrawn, unlike the normal wildmenu. So we must
// skip redraws to keep it visible. // skip redraws to keep it visible.
|| wild_menu_showing == WM_LIST) { || wild_menu_showing == WM_LIST) {
goto end; return;
} }
Terminal *term; Terminal *term;
void *stub; (void)(stub); void *stub; (void)(stub);
@ -1113,8 +1114,6 @@ static void refresh_timer_cb(TimeWatcher *watcher, void *data)
if (any_visible) { if (any_visible) {
redraw(true); redraw(true);
} }
end:
refresh_pending = false;
} }
static void refresh_size(Terminal *term, buf_T *buf) static void refresh_size(Terminal *term, buf_T *buf)