Merge PR #3029 'Refactor event processing architecture'

Helped-by: oni-link <knil.ino@gmail.com>
Reviewed-by: oni-link <knil.ino@gmail.com>
This commit is contained in:
Thiago de Arruda 2015-08-13 12:20:53 -03:00
commit a94a68145b
52 changed files with 925 additions and 411 deletions

2
.asan-blacklist Normal file
View File

@ -0,0 +1,2 @@
# libuv queue.h pointer arithmetic is not accepted by asan
fun:queue_node_data

View File

@ -57,6 +57,7 @@ src/nvim/keymap.h
src/nvim/lib/khash.h
src/nvim/lib/klist.h
src/nvim/lib/kvec.h
src/nvim/lib/queue.h
src/nvim/macros.h
src/nvim/main.c
src/nvim/main.h

View File

@ -34,8 +34,8 @@ c_params = Ct(c_void + c_param_list)
c_proto = Ct(
Cg(c_type, 'return_type') * Cg(c_id, 'name') *
fill * P('(') * fill * Cg(c_params, 'parameters') * fill * P(')') *
Cg(Cc(false), 'deferred') *
(fill * Cg((P('FUNC_ATTR_DEFERRED') * Cc(true)), 'deferred') ^ -1) *
Cg(Cc(false), 'async') *
(fill * Cg((P('FUNC_ATTR_ASYNC') * Cc(true)), 'async') ^ -1) *
fill * P(';')
)
grammar = Ct((c_proto + c_comment + c_preproc + ws) ^ 1)
@ -279,7 +279,7 @@ for i = 1, #functions do
'(String) {.data = "'..fn.name..'", '..
'.size = sizeof("'..fn.name..'") - 1}, '..
'(MsgpackRpcRequestHandler) {.fn = handle_'.. fn.name..
', .defer = '..tostring(fn.deferred)..'});\n')
', .async = '..tostring(fn.async)..'});\n')
if #fn.name > max_fname_len then
max_fname_len = #fn.name

View File

@ -217,7 +217,7 @@ install_helper(TARGETS nvim)
if(CLANG_ASAN_UBSAN)
message(STATUS "Enabling Clang address sanitizer and undefined behavior sanitizer for nvim.")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-DEXITFREE ")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined ")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/.asan-blacklist")
set_property(TARGET nvim APPEND_STRING PROPERTY LINK_FLAGS "-fsanitize=address -fsanitize=undefined ")
elseif(CLANG_MSAN)
message(STATUS "Enabling Clang memory sanitizer for nvim.")

View File

@ -70,7 +70,6 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err)
/// @param line The new line.
/// @param[out] err Details of an error that may have occurred
void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
FUNC_ATTR_DEFERRED
{
Object l = STRING_OBJ(line);
Array array = {.items = &l, .size = 1};
@ -83,7 +82,6 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
/// @param index The line index
/// @param[out] err Details of an error that may have occurred
void buffer_del_line(Buffer buffer, Integer index, Error *err)
FUNC_ATTR_DEFERRED
{
Array array = ARRAY_DICT_INIT;
buffer_set_line_slice(buffer, index, index, true, true, array, err);
@ -171,7 +169,6 @@ void buffer_set_line_slice(Buffer buffer,
Boolean include_end,
ArrayOf(String) replacement,
Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -339,7 +336,6 @@ Object buffer_get_var(Buffer buffer, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -375,7 +371,6 @@ Object buffer_get_option(Buffer buffer, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -426,7 +421,6 @@ String buffer_get_name(Buffer buffer, Error *err)
/// @param name The buffer name
/// @param[out] err Details of an error that may have occurred
void buffer_set_name(Buffer buffer, String name, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -472,7 +466,6 @@ void buffer_insert(Buffer buffer,
Integer lnum,
ArrayOf(String) lines,
Error *err)
FUNC_ATTR_DEFERRED
{
buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err);
}

View File

@ -62,7 +62,6 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The tab page handle
Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
tabpage_T *tab = find_tab_by_handle(tabpage, err);

View File

@ -37,7 +37,6 @@
/// @param str The command str
/// @param[out] err Details of an error that may have occurred
void vim_command(String str, Error *err)
FUNC_ATTR_DEFERRED
{
// Run the command
try_start();
@ -54,7 +53,6 @@ void vim_command(String str, Error *err)
/// @see feedkeys()
/// @see vim_strsave_escape_csi
void vim_feedkeys(String keys, String mode, Boolean escape_csi)
FUNC_ATTR_DEFERRED
{
bool remap = true;
bool insert = false;
@ -100,6 +98,7 @@ void vim_feedkeys(String keys, String mode, Boolean escape_csi)
/// @return The number of bytes actually written, which can be lower than
/// requested if the buffer becomes full.
Integer vim_input(String keys)
FUNC_ATTR_ASYNC
{
return (Integer)input_enqueue(keys);
}
@ -143,7 +142,6 @@ String vim_command_output(String str, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The expanded object
Object vim_eval(String str, Error *err)
FUNC_ATTR_DEFERRED
{
Object rv = OBJECT_INIT;
// Evaluate the expression
@ -171,7 +169,6 @@ Object vim_eval(String str, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return Result of the function call
Object vim_call_function(String fname, Array args, Error *err)
FUNC_ATTR_DEFERRED
{
Object rv = OBJECT_INIT;
if (args.size > MAX_FUNC_ARGS) {
@ -312,7 +309,6 @@ String vim_get_current_line(Error *err)
/// @param line The line contents
/// @param[out] err Details of an error that may have occurred
void vim_set_current_line(String line, Error *err)
FUNC_ATTR_DEFERRED
{
buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err);
}
@ -321,7 +317,6 @@ void vim_set_current_line(String line, Error *err)
///
/// @param[out] err Details of an error that may have occurred
void vim_del_current_line(Error *err)
FUNC_ATTR_DEFERRED
{
buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err);
}
@ -343,7 +338,6 @@ Object vim_get_var(String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return the old value if any
Object vim_set_var(String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
return dict_set_value(&globvardict, name, value, err);
}
@ -374,7 +368,6 @@ Object vim_get_option(String name, Error *err)
/// @param value The new option value
/// @param[out] err Details of an error that may have occurred
void vim_set_option(String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
set_option_to(NULL, SREQ_GLOBAL, name, value, err);
}
@ -383,7 +376,6 @@ void vim_set_option(String name, Object value, Error *err)
///
/// @param str The message
void vim_out_write(String str)
FUNC_ATTR_DEFERRED
{
write_msg(str, false);
}
@ -392,7 +384,6 @@ void vim_out_write(String str)
///
/// @param str The message
void vim_err_write(String str)
FUNC_ATTR_DEFERRED
{
write_msg(str, true);
}
@ -402,7 +393,6 @@ void vim_err_write(String str)
///
/// @param str The message
void vim_report_error(String str)
FUNC_ATTR_DEFERRED
{
vim_err_write(str);
vim_err_write((String) {.data = "\n", .size = 1});
@ -442,7 +432,6 @@ Buffer vim_get_current_buffer(void)
/// @param id The buffer handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_buffer(Buffer buffer, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -493,7 +482,6 @@ Window vim_get_current_window(void)
///
/// @param handle The window handle
void vim_set_current_window(Window window, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -545,7 +533,6 @@ Tabpage vim_get_current_tabpage(void)
/// @param handle The tab page handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err)
FUNC_ATTR_DEFERRED
{
tabpage_T *tp = find_tab_by_handle(tabpage, err);
@ -609,6 +596,7 @@ Dictionary vim_get_color_map(void)
Array vim_get_api_info(uint64_t channel_id)
FUNC_ATTR_ASYNC
{
Array rv = ARRAY_DICT_INIT;

View File

@ -54,7 +54,6 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err)
/// @param pos the (row, col) tuple representing the new position
/// @param[out] err Details of an error that may have occurred
void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -118,7 +117,6 @@ Integer window_get_height(Window window, Error *err)
/// @param height the new height in rows
/// @param[out] err Details of an error that may have occurred
void window_set_height(Window window, Integer height, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -162,7 +160,6 @@ Integer window_get_width(Window window, Error *err)
/// @param width the new width in columns
/// @param[out] err Details of an error that may have occurred
void window_set_width(Window window, Integer width, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -208,7 +205,6 @@ Object window_get_var(Window window, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object window_set_var(Window window, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -244,7 +240,6 @@ Object window_get_option(Window window, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void window_set_option(Window window, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);

View File

@ -254,7 +254,7 @@ edit (
)
{
if (curbuf->terminal) {
terminal_enter(true);
terminal_enter();
return false;
}
@ -601,15 +601,15 @@ edit (
* Get a character for Insert mode. Ignore K_IGNORE.
*/
lastc = c; /* remember previous char for CTRL-D */
loop_enable_deferred_events(&loop);
input_enable_events();
do {
c = safe_vgetc();
} while (c == K_IGNORE);
loop_disable_deferred_events(&loop);
input_disable_events();
if (c == K_EVENT) {
c = lastc;
loop_process_event(&loop);
queue_process_events(loop.events);
continue;
}

View File

@ -468,6 +468,7 @@ typedef struct {
dict_T *self;
int *status_ptr;
uint64_t id;
Queue *events;
} TerminalJobData;
/// Structure representing current VimL to messagepack conversion state
@ -493,6 +494,13 @@ typedef struct {
/// Stack used to convert VimL values to messagepack.
typedef kvec_t(MPConvStackVal) MPConvStack;
typedef struct {
TerminalJobData *data;
ufunc_T *callback;
const char *type;
list_T *received;
int status;
} JobEvent;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "eval.c.generated.h"
@ -501,15 +509,6 @@ typedef kvec_t(MPConvStackVal) MPConvStack;
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
// Memory pool for reusing JobEvent structures
typedef struct {
TerminalJobData *data;
ufunc_T *callback;
const char *type;
list_T *received;
int status;
} JobEvent;
static int disable_job_defer = 0;
static uint64_t current_job_id = 1;
static PMap(uint64_t) *jobs = NULL;
@ -10778,7 +10777,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
return;
}
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@ -10819,7 +10818,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@ -10860,7 +10859,7 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv)
}
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
@ -11007,8 +11006,8 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)
}
TerminalJobData *data = pmap_get(uint64_t)(jobs, argvars[0].vval.v_number);
if (!data || data->stopped) {
TerminalJobData *data = find_job(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invjob));
return;
}
@ -11038,28 +11037,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
list_T *rv = list_alloc();
ui_busy_start();
// disable breakchecks, which could result in job callbacks being executed
// at unexpected places
disable_breakcheck++;
// disable job event deferring so the callbacks are processed while waiting.
if (!disable_job_defer++) {
// process any pending job events in the deferred queue, but only do this if
// deferred is not disabled(at the top-level `jobwait()` call)
loop_process_event(&loop);
}
Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop);
// For each item in the input list append an integer to the output list. -3
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
|| !(data = find_job(arg->li_tv.vval.v_number))) {
list_append_number(rv, -3);
} else {
// append the list item and set the status pointer so we'll collect the
// status code when the job exits
list_append_number(rv, -1);
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
// Process any pending events for the job because we'll temporarily
// replace the parent queue
queue_process_events(data->events);
queue_replace_parent(data->events, waiting_jobs);
}
}
@ -11077,10 +11072,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
break;
}
if (arg->li_tv.v_type != VAR_NUMBER
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
|| !(data = find_job(arg->li_tv.vval.v_number))) {
continue;
}
int status = process_wait((Process *)&data->proc, remaining);
int status = process_wait((Process *)&data->proc, remaining, waiting_jobs);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
if (status == -2) {
@ -11100,23 +11095,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
}
}
// poll to ensure any pending callbacks from the last job are invoked
loop_poll_events(&loop, 0);
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
|| !(data = find_job(arg->li_tv.vval.v_number))) {
continue;
}
// remove the status pointer because the list may be freed before the
// job exits
data->status_ptr = NULL;
}
disable_job_defer--;
disable_breakcheck--;
ui_busy_stop();
// restore the parent queue for any jobs still alive
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
TerminalJobData *data = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
continue;
}
// restore the parent queue for the job
queue_process_events(data->events);
queue_replace_parent(data->events, loop.events);
}
queue_free(waiting_jobs);
ui_busy_stop();
rv->lv_refcount++;
rettv->v_type = VAR_LIST;
rettv->vval.v_list = rv;
@ -21053,17 +21056,21 @@ static inline TerminalJobData *common_job_init(char **argv, ufunc_T *on_stdout,
data->on_stderr = on_stderr;
data->on_exit = on_exit;
data->self = self;
data->events = queue_new_child(loop.events);
if (pty) {
data->proc.pty = pty_process_init(data);
data->proc.pty = pty_process_init(&loop, data);
} else {
data->proc.uv = uv_process_init(data);
data->proc.uv = uv_process_init(&loop, data);
}
Process *proc = (Process *)&data->proc;
proc->argv = argv;
proc->in = &data->in;
proc->out = &data->out;
proc->err = &data->err;
if (!pty) {
proc->err = &data->err;
}
proc->cb = on_process_exit;
proc->events = data->events;
return data;
}
@ -21094,8 +21101,12 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
{
data->refcount++;
Process *proc = (Process *)&data->proc;
if (!process_spawn(&loop, proc)) {
if (!process_spawn(proc)) {
EMSG(_(e_jobexe));
if (proc->type == kProcessTypePty) {
xfree(data->proc.pty.term_name);
free_term_job_data(data);
}
return false;
}
@ -21114,7 +21125,9 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
return true;
}
static inline void free_term_job_data(TerminalJobData *data) {
static inline void free_term_job_data_event(void **argv)
{
TerminalJobData *data = argv[0];
if (data->on_stdout) {
user_func_unref(data->on_stdout);
}
@ -21129,17 +21142,25 @@ static inline void free_term_job_data(TerminalJobData *data) {
data->self->internal_refcount--;
dict_unref(data->self);
}
queue_free(data->events);
xfree(data);
}
static inline void free_term_job_data(TerminalJobData *data)
{
// data->queue may still be used after this function returns(process_wait), so
// only free in the next event loop iteration
queue_put(loop.fast_events, free_term_job_data_event, 1, data);
}
// vimscript job callbacks must be executed on Nvim main loop
static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
static inline void process_job_event(TerminalJobData *data, ufunc_T *callback,
const char *type, char *buf, size_t count, int status)
{
JobEvent *event_data = xmalloc(sizeof(JobEvent));
event_data->received = NULL;
JobEvent event_data;
event_data.received = NULL;
if (buf) {
event_data->received = list_alloc();
event_data.received = list_alloc();
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
@ -21147,7 +21168,7 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
while (off < remaining) {
// append the line
if (ptr[off] == NL) {
list_append_string(event_data->received, (uint8_t *)ptr, off);
list_append_string(event_data.received, (uint8_t *)ptr, off);
size_t skip = off + 1;
ptr += skip;
remaining -= skip;
@ -21160,51 +21181,53 @@ static inline void push_job_event(TerminalJobData *data, ufunc_T *callback,
}
off++;
}
list_append_string(event_data->received, (uint8_t *)ptr, off);
list_append_string(event_data.received, (uint8_t *)ptr, off);
} else {
event_data->status = status;
event_data.status = status;
}
event_data->data = data;
event_data->callback = callback;
event_data->type = type;
loop_push_event(&loop, (Event) {
.handler = on_job_event,
.data = event_data
}, !disable_job_defer);
event_data.data = data;
event_data.callback = callback;
event_data.type = type;
on_job_event(&event_data);
}
static void on_job_stdout(Stream *stream, RBuffer *buf, void *job, bool eof)
static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
void *job, bool eof)
{
TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stdout, "stdout");
on_job_output(stream, job, buf, count, eof, data->on_stdout, "stdout");
}
static void on_job_stderr(Stream *stream, RBuffer *buf, void *job, bool eof)
static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
void *job, bool eof)
{
TerminalJobData *data = job;
on_job_output(stream, job, buf, eof, data->on_stderr, "stderr");
on_job_output(stream, job, buf, count, eof, data->on_stderr, "stderr");
}
static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf,
bool eof, ufunc_T *callback, const char *type)
size_t count, bool eof, ufunc_T *callback, const char *type)
{
if (eof) {
return;
}
RBUFFER_UNTIL_EMPTY(buf, ptr, len) {
// The order here matters, the terminal must receive the data first because
// push_job_event will modify the read buffer(convert NULs into NLs)
if (data->term) {
terminal_receive(data->term, ptr, len);
}
// stub variable, to keep reading consistent with the order of events, only
// consider the count parameter.
size_t r;
char *ptr = rbuffer_read_ptr(buf, &r);
if (callback) {
push_job_event(data, callback, type, ptr, len, 0);
}
rbuffer_consumed(buf, len);
// The order here matters, the terminal must receive the data first because
// process_job_event will modify the read buffer(convert NULs into NLs)
if (data->term) {
terminal_receive(data->term, ptr, count);
}
if (callback) {
process_job_event(data, callback, type, ptr, count, 0);
}
rbuffer_consumed(buf, count);
}
static void on_process_exit(Process *proc, int status, void *d)
@ -21220,7 +21243,7 @@ static void on_process_exit(Process *proc, int status, void *d)
*data->status_ptr = status;
}
push_job_event(data, data->on_exit, "exit", NULL, 0, status);
process_job_event(data, data->on_exit, "exit", NULL, 0, status);
}
static void term_write(char *buf, size_t size, void *d)
@ -21254,10 +21277,8 @@ static void term_job_data_decref(TerminalJobData *data)
}
}
static void on_job_event(Event event)
static void on_job_event(JobEvent *ev)
{
JobEvent *ev = event.data;
if (!ev->callback) {
goto end;
}
@ -21302,7 +21323,15 @@ end:
pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
xfree(ev);
}
static TerminalJobData *find_job(uint64_t id)
{
TerminalJobData *data = pmap_get(uint64_t)(jobs, id);
if (!data || data->stopped) {
return NULL;
}
return data;
}
static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv)

39
src/nvim/event/defs.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef NVIM_EVENT_DEFS_H
#define NVIM_EVENT_DEFS_H
#include <assert.h>
#include <stdarg.h>
#define EVENT_HANDLER_MAX_ARGC 4
typedef void (*argv_callback)(void **argv);
typedef struct message {
int priority;
argv_callback handler;
void *argv[EVENT_HANDLER_MAX_ARGC];
} Event;
#define VA_EVENT_INIT(event, p, h, a) \
do { \
assert(a <= EVENT_HANDLER_MAX_ARGC); \
(event)->priority = p; \
(event)->handler = h; \
if (a) { \
va_list args; \
va_start(args, a); \
for (int i = 0; i < a; i++) { \
(event)->argv[i] = va_arg(args, void *); \
} \
va_end(args); \
} \
} while (0)
static inline Event event_create(int priority, argv_callback cb, int argc, ...)
{
assert(argc <= EVENT_HANDLER_MAX_ARGC);
Event event;
VA_EVENT_INIT(&event, priority, cb, argc);
return event;
}
#endif // NVIM_EVENT_DEFS_H

View File

@ -1,3 +1,4 @@
#include <stdarg.h>
#include <stdint.h>
#include <uv.h>
@ -9,17 +10,23 @@
# include "event/loop.c.generated.h"
#endif
typedef struct idle_event {
uv_idle_t idle;
Event event;
} IdleEvent;
void loop_init(Loop *loop, void *data)
{
uv_loop_init(&loop->uv);
loop->uv.data = loop;
loop->deferred_events = kl_init(Event);
loop->immediate_events = kl_init(Event);
loop->children = kl_init(WatcherPtr);
loop->children_stop_requests = 0;
loop->events = queue_new_parent(loop_on_put, loop);
loop->fast_events = queue_new_child(loop->events);
uv_signal_init(&loop->uv, &loop->children_watcher);
uv_timer_init(&loop->uv, &loop->children_kill_timer);
uv_timer_init(&loop->uv, &loop->poll_timer);
}
void loop_poll_events(Loop *loop, int ms)
@ -30,89 +37,36 @@ void loop_poll_events(Loop *loop, int ms)
abort(); // Should not re-enter uv_run
}
bool wait = true;
uv_timer_t timer;
uv_run_mode mode = UV_RUN_ONCE;
if (ms > 0) {
uv_timer_init(&loop->uv, &timer);
// Use a repeating timeout of ms milliseconds to make sure
// we do not block indefinitely for I/O.
uv_timer_start(&timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
uv_timer_start(&loop->poll_timer, timer_cb, (uint64_t)ms, (uint64_t)ms);
} else if (ms == 0) {
// For ms == 0, we need to do a non-blocking event poll by
// setting the run mode to UV_RUN_NOWAIT.
wait = false;
mode = UV_RUN_NOWAIT;
}
if (wait) {
loop_run_once(loop);
} else {
loop_run_nowait(loop);
}
uv_run(&loop->uv, mode);
if (ms > 0) {
// Ensure the timer handle is closed and run the event loop
// once more to let libuv perform it's cleanup
uv_timer_stop(&timer);
uv_close((uv_handle_t *)&timer, NULL);
loop_run_nowait(loop);
uv_timer_stop(&loop->poll_timer);
}
recursive--; // Can re-enter uv_run now
process_events_from(loop->immediate_events);
queue_process_events(loop->fast_events);
}
bool loop_has_deferred_events(Loop *loop)
{
return loop->deferred_events_allowed && !kl_empty(loop->deferred_events);
}
void loop_enable_deferred_events(Loop *loop)
{
++loop->deferred_events_allowed;
}
void loop_disable_deferred_events(Loop *loop)
{
--loop->deferred_events_allowed;
}
// Queue an event
void loop_push_event(Loop *loop, Event event, bool deferred)
void loop_on_put(Queue *queue, void *data)
{
Loop *loop = data;
// Sometimes libuv will run pending callbacks(timer for example) before
// blocking for a poll. If this happens and the callback pushes a event to one
// of the queues, the event would only be processed after the poll
// returns(user hits a key for example). To avoid this scenario, we call
// uv_stop when a event is enqueued.
loop_stop(loop);
kl_push(Event, deferred ? loop->deferred_events : loop->immediate_events,
event);
}
void loop_process_event(Loop *loop)
{
process_events_from(loop->deferred_events);
}
void loop_run(Loop *loop)
{
uv_run(&loop->uv, UV_RUN_DEFAULT);
}
void loop_run_once(Loop *loop)
{
uv_run(&loop->uv, UV_RUN_ONCE);
}
void loop_run_nowait(Loop *loop)
{
uv_run(&loop->uv, UV_RUN_NOWAIT);
}
void loop_stop(Loop *loop)
{
uv_stop(&loop->uv);
}
@ -120,25 +74,12 @@ void loop_close(Loop *loop)
{
uv_close((uv_handle_t *)&loop->children_watcher, NULL);
uv_close((uv_handle_t *)&loop->children_kill_timer, NULL);
uv_close((uv_handle_t *)&loop->poll_timer, NULL);
do {
uv_run(&loop->uv, UV_RUN_DEFAULT);
} while (uv_loop_close(&loop->uv));
}
void loop_process_all_events(Loop *loop)
{
process_events_from(loop->immediate_events);
process_events_from(loop->deferred_events);
}
static void process_events_from(klist_t(Event) *queue)
{
while (!kl_empty(queue)) {
Event event = kl_shift(Event, queue);
event.handler(event);
}
}
static void timer_cb(uv_timer_t *handle)
{
}

View File

@ -7,38 +7,39 @@
#include "nvim/lib/klist.h"
#include "nvim/os/time.h"
typedef struct event Event;
typedef void (*event_handler)(Event event);
struct event {
void *data;
event_handler handler;
};
#include "nvim/event/queue.h"
typedef void * WatcherPtr;
#define _noop(x)
KLIST_INIT(WatcherPtr, WatcherPtr, _noop)
KLIST_INIT(Event, Event, _noop)
typedef struct loop {
uv_loop_t uv;
klist_t(Event) *deferred_events, *immediate_events;
int deferred_events_allowed;
Queue *events, *fast_events;
klist_t(WatcherPtr) *children;
uv_signal_t children_watcher;
uv_timer_t children_kill_timer;
uv_timer_t children_kill_timer, poll_timer;
size_t children_stop_requests;
} Loop;
#define CREATE_EVENT(queue, handler, argc, ...) \
do { \
if (queue) { \
queue_put((queue), (handler), argc, __VA_ARGS__); \
} else { \
void *argv[argc] = {__VA_ARGS__}; \
(handler)(argv); \
} \
} while (0)
// Poll for events until a condition or timeout
#define LOOP_POLL_EVENTS_UNTIL(loop, timeout, condition) \
#define LOOP_PROCESS_EVENTS_UNTIL(loop, queue, timeout, condition) \
do { \
int remaining = timeout; \
uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
while (!(condition)) { \
loop_poll_events(loop, remaining); \
LOOP_PROCESS_EVENTS(loop, queue, remaining); \
if (remaining == 0) { \
break; \
} else if (remaining > 0) { \
@ -52,6 +53,16 @@ typedef struct loop {
} \
} while (0)
#define LOOP_PROCESS_EVENTS(loop, queue, timeout) \
do { \
if (queue && !queue_empty(queue)) { \
queue_process_events(queue); \
} else { \
loop_poll_events(loop, timeout); \
} \
} while (0)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/loop.h.generated.h"
#endif

View File

@ -22,7 +22,7 @@
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define CLOSE_PROC_STREAM(proc, stream) \
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL); \
@ -30,19 +30,18 @@
} while (0)
bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
{
proc->loop = loop;
if (proc->in) {
uv_pipe_init(&loop->uv, &proc->in->uv.pipe, 0);
uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
}
if (proc->out) {
uv_pipe_init(&loop->uv, &proc->out->uv.pipe, 0);
uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
}
if (proc->err) {
uv_pipe_init(&loop->uv, &proc->err->uv.pipe, 0);
uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
}
bool success;
@ -77,6 +76,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->in) {
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
proc->refcount++;
@ -84,6 +84,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->out) {
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
proc->refcount++;
@ -91,6 +92,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
if (proc->err) {
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
proc->refcount++;
@ -99,7 +101,7 @@ bool process_spawn(Loop *loop, Process *proc) FUNC_ATTR_NONNULL_ALL
proc->internal_exit_cb = on_process_exit;
proc->internal_close_cb = decref;
proc->refcount++;
kl_push(WatcherPtr, loop->children, proc);
kl_push(WatcherPtr, proc->loop->children, proc);
return true;
}
@ -113,7 +115,7 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
}
// Wait until all children exit
LOOP_POLL_EVENTS_UNTIL(loop, -1, kl_empty(loop->children));
LOOP_PROCESS_EVENTS_UNTIL(loop, loop->events, -1, kl_empty(loop->children));
pty_process_teardown(loop);
}
@ -150,16 +152,24 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. Returns -2 if an user has interruped the
/// wait.
int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1)
{
// The default status is -1, which represents a timeout
int status = -1;
bool interrupted = false;
if (!proc->refcount) {
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
return proc->status;
}
if (!events) {
events = proc->events;
}
// Increase refcount to stop the exit callback from being called(and possibly
// being freed) before we have a chance to get the status.
proc->refcount++;
LOOP_POLL_EVENTS_UNTIL(proc->loop, ms,
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms,
// Until...
got_int || // interrupted by the user
proc->refcount == 1); // job exited
@ -171,12 +181,12 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
got_int = false;
process_stop(proc);
if (ms == -1) {
// We can only return, if all streams/handles are closed and the job
// We can only return if all streams/handles are closed and the job
// exited.
LOOP_POLL_EVENTS_UNTIL(proc->loop, -1, proc->refcount == 1);
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1,
proc->refcount == 1);
} else {
loop_poll_events(proc->loop, 0);
LOOP_PROCESS_EVENTS(proc->loop, events, 0);
}
}
@ -185,6 +195,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
// resources
status = interrupted ? -2 : proc->status;
decref(proc);
if (events) {
// the decref call created an exit event, process it now
queue_process_events(events);
}
} else {
proc->refcount--;
}
@ -250,6 +264,18 @@ static void children_kill_cb(uv_timer_t *handle)
}
}
static void process_close_event(void **argv)
{
Process *proc = argv[0];
shell_free_argv(proc->argv);
if (proc->type == kProcessTypePty) {
xfree(((PtyProcess *)proc)->term_name);
}
if (proc->cb) {
proc->cb(proc, proc->status, proc->data);
}
}
static void decref(Process *proc)
{
if (--proc->refcount != 0) {
@ -264,16 +290,9 @@ static void decref(Process *proc)
break;
}
}
assert(node);
kl_shift_at(WatcherPtr, loop->children, node);
shell_free_argv(proc->argv);
if (proc->type == kProcessTypePty) {
xfree(((PtyProcess *)proc)->term_name);
}
if (proc->cb) {
proc->cb(proc, proc->status, proc->data);
}
CREATE_EVENT(proc->events, process_close_event, 1, proc);
}
static void process_close(Process *proc)
@ -293,28 +312,27 @@ static void process_close(Process *proc)
}
}
static void process_close_handles(void **argv)
{
Process *proc = argv[0];
process_close_streams(proc);
process_close(proc);
}
static void on_process_exit(Process *proc)
{
if (exiting) {
on_process_exit_event((Event) {.data = proc});
} else {
loop_push_event(proc->loop,
(Event) {.handler = on_process_exit_event, .data = proc}, false);
}
Loop *loop = proc->loop;
if (loop->children_stop_requests && !--loop->children_stop_requests) {
if (proc->stopped_time && loop->children_stop_requests
&& !--loop->children_stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping process kill timer");
uv_timer_stop(&loop->children_kill_timer);
}
}
static void on_process_exit_event(Event event)
{
Process *proc = event.data;
process_close_streams(proc);
process_close(proc);
// Process handles are closed in the next event loop tick. This is done to
// give libuv more time to read data from the OS after the process exits(If
// process_close_streams is called with data still in the OS buffer, we lose
// it)
CREATE_EVENT(proc->events, process_close_handles, 1, proc);
}
static void on_process_stream_close(Stream *stream, void *data)

View File

@ -26,14 +26,16 @@ struct process {
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, term_sent;
Queue *events;
};
static inline Process process_init(ProcessType type, void *data)
static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
.type = type,
.data = data,
.loop = NULL,
.loop = loop,
.events = NULL,
.pid = 0,
.status = 0,
.refcount = 0,

View File

@ -33,17 +33,18 @@
# include "event/pty_process.c.generated.h"
#endif
static const unsigned int KILL_RETRIES = 5;
static const unsigned int KILL_TIMEOUT = 2; // seconds
bool pty_process_spawn(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
static struct termios termios;
if (!termios.c_cflag) {
init_termios(&termios);
}
Process *proc = (Process *)ptyproc;
assert(!proc->err);
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ptyproc->height, ptyproc->width, 0, 0};
struct termios termios;
init_termios(&termios);
uv_disable_stdio_inheritance();
int master;
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
@ -73,9 +74,6 @@ bool pty_process_spawn(PtyProcess *ptyproc)
if (proc->out && !set_duplicating_descriptor(master, &proc->out->uv.pipe)) {
goto error;
}
if (proc->err && !set_duplicating_descriptor(master, &proc->err->uv.pipe)) {
goto error;
}
ptyproc->tty_fd = master;
proc->pid = pid;
@ -83,19 +81,8 @@ bool pty_process_spawn(PtyProcess *ptyproc)
error:
close(master);
// terminate spawned process
kill(pid, SIGTERM);
int status, child;
unsigned int try = 0;
while (try++ < KILL_RETRIES && !(child = waitpid(pid, &status, WNOHANG))) {
sleep(KILL_TIMEOUT);
}
if (child != pid) {
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
}
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
return false;
}
@ -152,7 +139,6 @@ static void init_child(PtyProcess *ptyproc) FUNC_ATTR_NONNULL_ALL
static void init_termios(struct termios *termios) FUNC_ATTR_NONNULL_ALL
{
memset(termios, 0, sizeof(struct termios));
// Taken from pangoterm
termios->c_iflag = ICRNL|IXON;
termios->c_oflag = OPOST|ONLCR;

View File

@ -13,10 +13,10 @@ typedef struct pty_process {
int tty_fd;
} PtyProcess;
static inline PtyProcess pty_process_init(void *data)
static inline PtyProcess pty_process_init(Loop *loop, void *data)
{
PtyProcess rv;
rv.process = process_init(kProcessTypePty, data);
rv.process = process_init(loop, kProcessTypePty, data);
rv.term_name = NULL;
rv.width = 80;
rv.height = 24;

208
src/nvim/event/queue.c Normal file
View File

@ -0,0 +1,208 @@
// Queue for selective async event processing. Instances of this queue support a
// parent/child relationship with the following properties:
//
// - pushing a node to a child queue will push a corresponding link node to the
// parent queue
// - removing a link node from a parent queue will remove the next node
// in the linked child queue
// - removing a node from a child queue will remove the corresponding link node
// in the parent queue
//
// These properties allow neovim to organize and process events from different
// sources with a certain degree of control. Here's how the queue is used:
//
// +----------------+
// | Main loop |
// +----------------+
// ^
// |
// +----------------+
// +-------------->| Event loop |<------------+
// | +--+-------------+ |
// | ^ ^ |
// | | | |
// +-----------+ +-----------+ +---------+ +---------+
// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
// +-----------+ +-----------+ +---------+ +---------+
//
//
// In the above diagram, the lower boxes represents event emitters, each with
// it's own private queue that have the event loop queue as the parent.
//
// When idle, the main loop spins the event loop which queues events from many
// sources(channels, jobs, user...). Each event emitter pushes events to its own
// private queue which is propagated to the event loop queue. When the main loop
// consumes an event, the corresponding event is removed from the emitter's
// queue.
//
// The main reason for this queue hierarchy is to allow focusing on a single
// event emitter while blocking the main loop. For example, if the `jobwait`
// vimscript function is called on job1, the main loop will temporarily stop
// polling the event loop queue and poll job1 queue instead. Same with channels,
// when calling `rpcrequest`, we want to temporarily stop processing events from
// other sources and focus on a specific channel.
#include <assert.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <uv.h>
#include "nvim/event/queue.h"
#include "nvim/memory.h"
#include "nvim/os/time.h"
typedef struct queue_item QueueItem;
struct queue_item {
union {
Queue *queue;
struct {
Event event;
QueueItem *parent;
} item;
} data;
bool link; // this is just a link to a node in a child queue
QUEUE node;
};
struct queue {
Queue *parent;
QUEUE headtail;
put_callback put_cb;
void *data;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/queue.c.generated.h"
#endif
static Event NILEVENT = {.handler = NULL, .argv = {NULL}};
Queue *queue_new_parent(put_callback put_cb, void *data)
{
return queue_new(NULL, put_cb, data);
}
Queue *queue_new_child(Queue *parent)
FUNC_ATTR_NONNULL_ALL
{
assert(!parent->parent);
return queue_new(parent, NULL, NULL);
}
static Queue *queue_new(Queue *parent, put_callback put_cb, void *data)
{
Queue *rv = xmalloc(sizeof(Queue));
QUEUE_INIT(&rv->headtail);
rv->parent = parent;
rv->put_cb = put_cb;
rv->data = data;
return rv;
}
void queue_free(Queue *queue)
{
assert(queue);
if (queue->parent) {
while (!QUEUE_EMPTY(&queue->headtail)) {
QUEUE *q = QUEUE_HEAD(&queue->headtail);
QueueItem *item = queue_node_data(q);
assert(!item->link);
QUEUE_REMOVE(&item->data.item.parent->node);
xfree(item->data.item.parent);
QUEUE_REMOVE(q);
xfree(item);
}
}
xfree(queue);
}
Event queue_get(Queue *queue)
{
return queue_empty(queue) ? NILEVENT : queue_remove(queue);
}
void queue_put_event(Queue *queue, Event event)
{
assert(queue);
assert(queue->parent); // don't push directly to the parent queue
queue_push(queue, event);
if (queue->parent->put_cb) {
queue->parent->put_cb(queue->parent, queue->parent->data);
}
}
void queue_process_events(Queue *queue)
{
assert(queue);
while (!queue_empty(queue)) {
Event event = queue_get(queue);
if (event.handler) {
event.handler(event.argv);
}
}
}
bool queue_empty(Queue *queue)
{
assert(queue);
return QUEUE_EMPTY(&queue->headtail);
}
void queue_replace_parent(Queue *queue, Queue *new_parent)
{
assert(queue_empty(queue));
queue->parent = new_parent;
}
static Event queue_remove(Queue *queue)
{
assert(!queue_empty(queue));
QUEUE *h = QUEUE_HEAD(&queue->headtail);
QUEUE_REMOVE(h);
QueueItem *item = queue_node_data(h);
Event rv;
if (item->link) {
assert(!queue->parent);
// remove the next node in the linked queue
Queue *linked = item->data.queue;
assert(!queue_empty(linked));
QueueItem *child =
queue_node_data(QUEUE_HEAD(&linked->headtail));
QUEUE_REMOVE(&child->node);
rv = child->data.item.event;
xfree(child);
} else {
assert(queue->parent);
assert(!queue_empty(queue->parent));
// remove the corresponding link node in the parent queue
QUEUE_REMOVE(&item->data.item.parent->node);
xfree(item->data.item.parent);
rv = item->data.item.event;
}
xfree(item);
return rv;
}
static void queue_push(Queue *queue, Event event)
{
QueueItem *item = xmalloc(sizeof(QueueItem));
item->link = false;
item->data.item.event = event;
QUEUE_INSERT_TAIL(&queue->headtail, &item->node);
// push link node to the parent queue
item->data.item.parent = xmalloc(sizeof(QueueItem));
item->data.item.parent->link = true;
item->data.item.parent->data.queue = queue;
QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node);
}
static QueueItem *queue_node_data(QUEUE *q)
{
return QUEUE_DATA(q, QueueItem, node);
}

19
src/nvim/event/queue.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef NVIM_EVENT_QUEUE_H
#define NVIM_EVENT_QUEUE_H
#include <uv.h>
#include "nvim/event/defs.h"
#include "nvim/lib/queue.h"
typedef struct queue Queue;
typedef void (*put_callback)(Queue *queue, void *data);
#define queue_put(q, h, ...) \
queue_put_event(q, event_create(1, h, __VA_ARGS__));
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/queue.h.generated.h"
#endif
#endif // NVIM_EVENT_QUEUE_H

View File

@ -49,6 +49,7 @@ void rstream_init(Stream *stream, size_t bufsize)
///
/// @param stream The `Stream` instance
void rstream_start(Stream *stream, stream_read_cb cb)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
if (stream->uvstream) {
@ -62,6 +63,7 @@ void rstream_start(Stream *stream, stream_read_cb cb)
///
/// @param stream The `Stream` instance
void rstream_stop(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
if (stream->uvstream) {
uv_read_stop(stream->uvstream);
@ -112,7 +114,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(uvstream);
invoke_read_cb(stream, true);
invoke_read_cb(stream, 0, true);
}
return;
}
@ -122,7 +124,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
invoke_read_cb(stream, false);
invoke_read_cb(stream, nread, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@ -156,7 +158,7 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(&stream->uv.idle);
invoke_read_cb(stream, true);
invoke_read_cb(stream, 0, true);
return;
}
@ -164,12 +166,21 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(stream->buffer, nread);
stream->fpos += nread;
invoke_read_cb(stream, false);
invoke_read_cb(stream, nread, false);
}
static void invoke_read_cb(Stream *stream, bool eof)
static void read_event(void **argv)
{
Stream *stream = argv[0];
if (stream->read_cb) {
stream->read_cb(stream, stream->buffer, stream->data, eof);
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
stream->read_cb(stream, stream->buffer, count, stream->data, eof);
}
}
static void invoke_read_cb(Stream *stream, size_t count, bool eof)
{
CREATE_EVENT(stream->events, read_event, 3, stream,
(void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
}

View File

@ -15,6 +15,7 @@ void signal_watcher_init(Loop *loop, SignalWatcher *watcher, void *data)
watcher->uv.data = watcher;
watcher->data = data;
watcher->cb = NULL;
watcher->events = loop->fast_events;
}
void signal_watcher_start(SignalWatcher *watcher, signal_cb cb, int signum)
@ -37,10 +38,16 @@ void signal_watcher_close(SignalWatcher *watcher, signal_close_cb cb)
uv_close((uv_handle_t *)&watcher->uv, close_cb);
}
static void signal_event(void **argv)
{
SignalWatcher *watcher = argv[0];
watcher->cb(watcher, watcher->uv.signum, watcher->data);
}
static void signal_watcher_cb(uv_signal_t *handle, int signum)
{
SignalWatcher *watcher = handle->data;
watcher->cb(watcher, signum, watcher->data);
CREATE_EVENT(watcher->events, signal_event, 1, watcher);
}
static void close_cb(uv_handle_t *handle)

View File

@ -14,6 +14,7 @@ struct signal_watcher {
void *data;
signal_cb cb;
signal_close_cb close_cb;
Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS

View File

@ -77,6 +77,7 @@ void socket_watcher_init(Loop *loop, SocketWatcher *watcher,
watcher->stream->data = watcher;
watcher->cb = NULL;
watcher->close_cb = NULL;
watcher->events = NULL;
}
int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
@ -113,6 +114,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
}
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@ -142,10 +144,18 @@ void socket_watcher_close(SocketWatcher *watcher, socket_close_cb cb)
uv_close((uv_handle_t *)watcher->stream, close_cb);
}
static void connection_event(void **argv)
{
SocketWatcher *watcher = argv[0];
int status = (int)(uintptr_t)(argv[1]);
watcher->cb(watcher, status, watcher->data);
}
static void connection_cb(uv_stream_t *handle, int status)
{
SocketWatcher *watcher = handle->data;
watcher->cb(watcher, status, watcher->data);
CREATE_EVENT(watcher->events, connection_event, 2, watcher,
(void *)(uintptr_t)status);
}
static void close_cb(uv_handle_t *handle)

View File

@ -30,6 +30,7 @@ struct socket_watcher {
void *data;
socket_cb cb;
socket_close_cb close_cb;
Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS

View File

@ -32,6 +32,7 @@ int stream_set_blocking(int fd, bool blocking)
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
void *data)
FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@ -55,6 +56,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
if (stream->uvstream) {
stream->uvstream->data = stream;
loop = stream->uvstream->loop->data;
}
stream->data = data;
@ -69,16 +71,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->internal_close_cb = NULL;
stream->closed = false;
stream->buffer = NULL;
stream->events = NULL;
}
void stream_close(Stream *stream, stream_close_cb on_stream_close)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
stream->closed = true;
stream->close_cb = on_stream_close;
@ -88,6 +87,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close)
}
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{
if (stream->uvstream) {
uv_close((uv_handle_t *)stream->uvstream, close_cb);
@ -99,6 +99,9 @@ void stream_close_handle(Stream *stream)
static void close_cb(uv_handle_t *handle)
{
Stream *stream = handle->data;
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
if (stream->close_cb) {
stream->close_cb(stream, stream->data);
}

View File

@ -14,10 +14,14 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
/// @param count Number of bytes to read. This must be respected if keeping
/// the order of events is a requirement. This is because events
/// may be queued and only processed later when more data is copied
/// into to the buffer, so one read may starve another.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, void *data,
bool eof);
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof);
/// Type of function called when the Stream has information about a write
/// request.
@ -47,6 +51,7 @@ struct stream {
size_t pending_reqs;
void *data, *internal_data;
bool closed;
Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS

View File

@ -16,6 +16,7 @@ void time_watcher_init(Loop *loop, TimeWatcher *watcher, void *data)
uv_timer_init(&loop->uv, &watcher->uv);
watcher->uv.data = watcher;
watcher->data = data;
watcher->events = loop->fast_events;
}
void time_watcher_start(TimeWatcher *watcher, time_cb cb, uint64_t timeout,
@ -39,11 +40,17 @@ void time_watcher_close(TimeWatcher *watcher, time_cb cb)
uv_close((uv_handle_t *)&watcher->uv, close_cb);
}
static void time_event(void **argv)
{
TimeWatcher *watcher = argv[0];
watcher->cb(watcher, watcher->data);
}
static void time_watcher_cb(uv_timer_t *handle)
FUNC_ATTR_NONNULL_ALL
{
TimeWatcher *watcher = handle->data;
watcher->cb(watcher, watcher->data);
CREATE_EVENT(watcher->events, time_event, 1, watcher);
}
static void close_cb(uv_handle_t *handle)

View File

@ -12,6 +12,7 @@ struct time_watcher {
uv_timer_t uv;
void *data;
time_cb cb, close_cb;
Queue *events;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS

View File

@ -12,10 +12,10 @@ typedef struct uv_process {
uv_stdio_container_t uvstdio[3];
} UvProcess;
static inline UvProcess uv_process_init(void *data)
static inline UvProcess uv_process_init(Loop *loop, void *data)
{
UvProcess rv;
rv.process = process_init(kProcessTypeUv, data);
rv.process = process_init(loop, kProcessTypeUv, data);
return rv;
}

View File

@ -118,6 +118,7 @@ WBuffer *wstream_new_buffer(char *data,
size_t size,
size_t refcount,
wbuffer_data_finalizer cb)
FUNC_ATTR_NONNULL_ARG(1)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
@ -151,6 +152,7 @@ static void write_cb(uv_write_t *req, int status)
}
void wstream_release_wbuffer(WBuffer *buffer)
FUNC_ATTR_NONNULL_ALL
{
if (!--buffer->refcount) {
if (buffer->cb) {

View File

@ -62,6 +62,7 @@
#include "nvim/tag.h"
#include "nvim/window.h"
#include "nvim/ui.h"
#include "nvim/os/input.h"
#include "nvim/os/os.h"
#include "nvim/event/loop.h"
@ -298,14 +299,14 @@ getcmdline (
/* Get a character. Ignore K_IGNORE, it should not do anything, such
* as stop completion. */
loop_enable_deferred_events(&loop);
input_enable_events();
do {
c = safe_vgetc();
} while (c == K_IGNORE);
loop_disable_deferred_events(&loop);
input_disable_events();
if (c == K_EVENT) {
loop_process_event(&loop);
queue_process_events(loop.events);
continue;
}

View File

@ -179,7 +179,7 @@
#endif
#ifdef DEFINE_FUNC_ATTRIBUTES
#define FUNC_ATTR_DEFERRED
#define FUNC_ATTR_ASYNC
#define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC
#define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x)
#define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y)

View File

@ -898,14 +898,6 @@ EXTERN FILE *scriptout INIT(= NULL); /* stream to write script to */
/* volatile because it is used in signal handler catch_sigint(). */
EXTERN volatile int got_int INIT(= FALSE); /* set to TRUE when interrupt
signal occurred */
EXTERN int disable_breakcheck INIT(= 0); // > 0 if breakchecks should be
// ignored. FIXME(tarruda): Hacky
// way to run functions that would
// result in *_breakcheck calls
// while events that would normally
// be deferred are being processed
// immediately. Ref:
// neovim/neovim#2371
EXTERN int bangredo INIT(= FALSE); /* set to TRUE with ! command */
EXTERN int searchcmdlen; /* length of previous search cmd */
EXTERN int reg_do_extmatch INIT(= 0); /* Used when compiling regexp:

92
src/nvim/lib/queue.h Normal file
View File

@ -0,0 +1,92 @@
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef QUEUE_H_
#define QUEUE_H_
typedef void *QUEUE[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Public macros. */
#define QUEUE_DATA(ptr, type, field) \
((type *) ((char *) (ptr) - ((char *) &((type *) 0)->field)))
#define QUEUE_FOREACH(q, h) \
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
#define QUEUE_EMPTY(q) \
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
#define QUEUE_HEAD(q) \
(QUEUE_NEXT(q))
#define QUEUE_INIT(q) \
do { \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
} \
while (0)
#define QUEUE_ADD(h, n) \
do { \
QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV(h) = QUEUE_PREV(n); \
QUEUE_PREV_NEXT(h) = (h); \
} \
while (0)
#define QUEUE_SPLIT(h, q, n) \
do { \
QUEUE_PREV(n) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(n) = (n); \
QUEUE_NEXT(n) = (q); \
QUEUE_PREV(h) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(h) = (h); \
QUEUE_PREV(q) = (n); \
} \
while (0)
#define QUEUE_INSERT_HEAD(h, q) \
do { \
QUEUE_NEXT(q) = QUEUE_NEXT(h); \
QUEUE_PREV(q) = (h); \
QUEUE_NEXT_PREV(q) = (q); \
QUEUE_NEXT(h) = (q); \
} \
while (0)
#define QUEUE_INSERT_TAIL(h, q) \
do { \
QUEUE_NEXT(q) = (h); \
QUEUE_PREV(q) = QUEUE_PREV(h); \
QUEUE_PREV_NEXT(q) = (q); \
QUEUE_PREV(h) = (q); \
} \
while (0)
#define QUEUE_REMOVE(q) \
do { \
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
} \
while (0)
#endif /* QUEUE_H_ */

View File

@ -157,11 +157,11 @@ void event_init(void)
void event_teardown(void)
{
if (!loop.deferred_events) {
if (!loop.events) {
return;
}
loop_process_all_events(&loop);
queue_process_events(loop.events);
input_stop();
channel_teardown();
process_teardown(&loop);

View File

@ -116,5 +116,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER)
MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER)
#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false}
#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .async = false}
MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER)

View File

@ -68,6 +68,7 @@ typedef struct {
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
Queue *events;
} Channel;
typedef struct {
@ -123,14 +124,14 @@ void channel_teardown(void)
uint64_t channel_from_process(char **argv)
{
Channel *channel = register_channel(kChannelTypeProc);
channel->data.process.uvproc = uv_process_init(channel);
channel->data.process.uvproc = uv_process_init(&loop, channel);
Process *proc = &channel->data.process.uvproc.process;
proc->argv = argv;
proc->in = &channel->data.process.in;
proc->out = &channel->data.process.out;
proc->err = &channel->data.process.err;
proc->cb = process_exit;
if (!process_spawn(&loop, proc)) {
if (!process_spawn(proc)) {
loop_poll_events(&loop, 0);
decref(channel);
return 0;
@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);
LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@ -327,7 +328,8 @@ static void channel_from_stdio(void)
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
static void forward_stderr(Stream *stream, RBuffer *rbuf, void *data, bool eof)
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
@ -342,7 +344,8 @@ static void process_exit(Process *proc, int status, void *data)
decref(data);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
bool eof)
{
Channel *channel = data;
incref(channel);
@ -450,31 +453,31 @@ static void handle_request(Channel *channel, msgpack_object *request)
method->via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
handler.defer = false;
handler.async = true;
}
Array args = ARRAY_DICT_INIT;
if (!msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
handler.fn = msgpack_rpc_handle_invalid_arguments;
handler.defer = false;
handler.async = true;
}
bool defer = (!kv_size(channel->call_stack) && handler.defer);
RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
loop_push_event(&loop, (Event) {
.handler = on_request_event,
.data = event_data
}, defer);
if (handler.async) {
on_request_event((void **)&event_data);
} else {
queue_put(channel->events, on_request_event, 1, event_data);
}
}
static void on_request_event(Event event)
static void on_request_event(void **argv)
{
RequestEvent *e = event.data;
RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
@ -649,9 +652,8 @@ static void close_channel(Channel *channel)
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
loop_push_event(&loop,
(Event) { .handler = on_stdio_close, .data = channel }, false);
break;
queue_put(loop.fast_events, exit_event, 1, channel);
return;
default:
abort();
}
@ -659,9 +661,9 @@ static void close_channel(Channel *channel)
decref(channel);
}
static void on_stdio_close(Event e)
static void exit_event(void **argv)
{
decref(e.data);
decref(argv[0]);
if (!exiting) {
mch_exit(0);
@ -683,6 +685,7 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
queue_free(channel->events);
xfree(channel);
}
@ -694,6 +697,7 @@ static void close_cb(Stream *stream, void *data)
static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
rv->events = queue_new_child(loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;

View File

@ -11,9 +11,8 @@ typedef struct {
uint64_t request_id,
Array args,
Error *error);
bool defer; // Should the call be deferred to the main loop? This should
// be true if the function mutates editor data structures such
// as buffers, windows, tabs, or if it executes vimscript code.
bool async; // function is always safe to run immediately instead of being
// put in a request queue for handling when nvim waits for input.
} MsgpackRpcRequestHandler;
/// Initializes the msgpack-rpc method table

View File

@ -28,7 +28,7 @@ void remote_ui_init(void)
connected_uis = pmap_new(uint64_t)();
// Add handler for "attach_ui"
String method = cstr_as_string("ui_attach");
MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .defer = false};
MsgpackRpcRequestHandler handler = {.fn = remote_ui_attach, .async = true};
msgpack_rpc_add_method_handler(method, handler);
method = cstr_as_string("ui_detach");
handler.fn = remote_ui_detach;

View File

@ -63,6 +63,7 @@
#include "nvim/window.h"
#include "nvim/event/loop.h"
#include "nvim/os/time.h"
#include "nvim/os/input.h"
/*
* The Visual area is remembered for reselection.
@ -487,12 +488,12 @@ normal_cmd (
/*
* Get the command character from the user.
*/
loop_enable_deferred_events(&loop);
input_enable_events();
c = safe_vgetc();
loop_disable_deferred_events(&loop);
input_disable_events();
if (c == K_EVENT) {
loop_process_event(&loop);
queue_process_events(loop.events);
return;
}

View File

@ -33,6 +33,7 @@ static Stream read_stream = {.closed = true};
static RBuffer *input_buffer = NULL;
static bool input_eof = false;
static int global_fd = 0;
static int events_enabled = 0;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/input.c.generated.h"
@ -110,8 +111,8 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
}
// If there are deferred events, return the keys directly
if (loop_has_deferred_events(&loop)) {
// If there are events, return the keys directly
if (pending_events()) {
return push_event_key(buf, maxlen);
}
@ -131,11 +132,21 @@ bool os_char_avail(void)
// Check for CTRL-C typed by reading all available characters.
void os_breakcheck(void)
{
if (!disable_breakcheck && !got_int) {
if (!got_int) {
loop_poll_events(&loop, 0);
}
}
void input_enable_events(void)
{
events_enabled++;
}
void input_disable_events(void)
{
events_enabled--;
}
/// Test whether a file descriptor refers to a terminal.
///
/// @param fd File descriptor.
@ -281,7 +292,7 @@ static bool input_poll(int ms)
prof_inchar_enter();
}
LOOP_POLL_EVENTS_UNTIL(&loop, ms, input_ready() || input_eof);
LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, ms, input_ready() || input_eof);
if (do_profiling == PROF_YES && ms) {
prof_inchar_exit();
@ -305,7 +316,8 @@ static InbufPollResult inbuf_poll(int ms)
return input_eof ? kInputEof : kInputNone;
}
static void read_cb(Stream *stream, RBuffer *buf, void *data, bool at_eof)
static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
bool at_eof)
{
if (at_eof) {
input_eof = true;
@ -358,7 +370,7 @@ static bool input_ready(void)
{
return typebuf_was_filled || // API call filled typeahead
rbuffer_size(input_buffer) || // Input buffer filled
loop_has_deferred_events(&loop); // Events must be processed
pending_events(); // Events must be processed
}
// Exit because of an input read error.
@ -369,3 +381,8 @@ static void read_error_exit(void)
STRCPY(IObuff, _("Vim: Error reading input, exiting...\n"));
preserve_exit();
}
static bool pending_events(void)
{
return events_enabled && !queue_empty(loop.events);
}

View File

@ -205,13 +205,15 @@ static int do_os_system(char **argv,
xstrlcpy(prog, argv[0], MAXPATHL);
Stream in, out, err;
UvProcess uvproc = uv_process_init(&buf);
UvProcess uvproc = uv_process_init(&loop, &buf);
Process *proc = &uvproc.process;
Queue *events = queue_new_child(loop.events);
proc->events = events;
proc->argv = argv;
proc->in = input != NULL ? &in : NULL;
proc->out = &out;
proc->err = &err;
if (!process_spawn(&loop, proc)) {
if (!process_spawn(proc)) {
loop_poll_events(&loop, 0);
// Failed, probably due to `sh` not being executable
if (!silent) {
@ -219,14 +221,22 @@ static int do_os_system(char **argv,
msg_outtrans((char_u *)prog);
msg_putchar('\n');
}
queue_free(events);
return -1;
}
// We want to deal with stream events as fast a possible while queueing
// process events, so reset everything to NULL. It prevents closing the
// streams while there's still data in the OS buffer(due to the process
// exiting before all data is read).
if (input != NULL) {
proc->in->events = NULL;
wstream_init(proc->in, 0);
}
proc->out->events = NULL;
rstream_init(proc->out, 0);
rstream_start(proc->out, data_cb);
proc->err->events = NULL;
rstream_init(proc->err, 0);
rstream_start(proc->err, data_cb);
@ -247,7 +257,7 @@ static int do_os_system(char **argv,
// the UI
ui_busy_start();
ui_flush();
int status = process_wait(proc, -1);
int status = process_wait(proc, -1, NULL);
ui_busy_stop();
// prepare the out parameters if requested
@ -267,6 +277,9 @@ static int do_os_system(char **argv,
}
}
assert(queue_empty(events));
queue_free(events);
return status;
}
@ -285,7 +298,8 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
buf->data = xrealloc(buf->data, buf->cap);
}
static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
static void system_data_cb(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
DynamicBuffer *dbuf = data;
@ -295,7 +309,8 @@ static void system_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
dbuf->len += nread;
}
static void out_data_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data,
bool eof)
{
size_t cnt;
char *ptr = rbuffer_read_ptr(buf, &cnt);

View File

@ -115,16 +115,6 @@ static void deadly_signal(int signum)
static void on_signal(SignalWatcher *handle, int signum, void *data)
{
assert(signum >= 0);
loop_push_event(&loop, (Event) {
.handler = on_signal_event,
.data = (void *)(uintptr_t)signum
}, false);
}
static void on_signal_event(Event event)
{
int signum = (int)(uintptr_t)event.data;
switch (signum) {
#ifdef SIGPWR
case SIGPWR:
@ -148,4 +138,3 @@ static void on_signal_event(Event event)
break;
}
}

View File

@ -43,7 +43,7 @@ void os_delay(uint64_t milliseconds, bool ignoreinput)
if (milliseconds > INT_MAX) {
milliseconds = INT_MAX;
}
LOOP_POLL_EVENTS_UNTIL(&loop, (int)milliseconds, got_int);
LOOP_PROCESS_EVENTS_UNTIL(&loop, NULL, (int)milliseconds, got_int);
} else {
os_microdelay(milliseconds * 1000);
}

View File

@ -69,6 +69,7 @@
#include "nvim/fileio.h"
#include "nvim/event/loop.h"
#include "nvim/event/time.h"
#include "nvim/os/input.h"
#include "nvim/api/private/helpers.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
@ -324,7 +325,7 @@ void terminal_resize(Terminal *term, uint16_t width, uint16_t height)
invalidate_terminal(term, -1, -1);
}
void terminal_enter(bool process_deferred)
void terminal_enter(void)
{
Terminal *term = curbuf->terminal;
assert(term && "should only be called when curbuf has a terminal");
@ -353,15 +354,9 @@ void terminal_enter(bool process_deferred)
bool got_bs = false; // True if the last input was <C-\>
while (term->buf == curbuf) {
if (process_deferred) {
loop_enable_deferred_events(&loop);
}
input_enable_events();
c = safe_vgetc();
if (process_deferred) {
loop_disable_deferred_events(&loop);
}
input_disable_events();
switch (c) {
case K_LEFTMOUSE:
@ -381,7 +376,7 @@ void terminal_enter(bool process_deferred)
break;
case K_EVENT:
loop_process_event(&loop);
queue_process_events(loop.events);
break;
case Ctrl_N:
@ -427,7 +422,13 @@ void terminal_destroy(Terminal *term)
term->buf->terminal = NULL;
}
term->buf = NULL;
pmap_del(ptr_t)(invalidated_terminals, term);
if (pmap_has(ptr_t)(invalidated_terminals, term)) {
// flush any pending changes to the buffer
block_autocmds();
refresh_terminal(term);
unblock_autocmds();
pmap_del(ptr_t)(invalidated_terminals, term);
}
for (size_t i = 0 ; i < term->sb_current; i++) {
xfree(term->sb_buffer[i]);
}
@ -883,48 +884,47 @@ static void invalidate_terminal(Terminal *term, int start_row, int end_row)
}
}
static void refresh_terminal(Terminal *term)
{
// TODO(SplinterOfChaos): Find the condition that makes term->buf invalid.
bool valid = true;
if (!term->buf || !(valid = buf_valid(term->buf))) {
// destroyed by `close_buffer`. Dont do anything else
if (!valid) {
term->buf = NULL;
}
return;
}
bool pending_resize = term->pending_resize;
WITH_BUFFER(term->buf, {
refresh_size(term);
refresh_scrollback(term);
refresh_screen(term);
redraw_buf_later(term->buf, NOT_VALID);
});
adjust_topline(term, pending_resize);
}
// libuv timer callback. This will enqueue on_refresh to be processed as an
// event.
static void refresh_timer_cb(TimeWatcher *watcher, void *data)
{
loop_push_event(&loop, (Event) {.handler = on_refresh}, false);
refresh_pending = false;
}
// Refresh all invalidated terminals
static void on_refresh(Event event)
{
if (exiting) {
// bad things can happen if we redraw when exiting, and there's no need to
// update the buffer.
return;
goto end;
}
Terminal *term;
void *stub; (void)(stub);
// don't process autocommands while updating terminal buffers
block_autocmds();
map_foreach(invalidated_terminals, term, stub, {
// TODO(SplinterOfChaos): Find the condition that makes term->buf invalid.
bool valid = true;
if (!term->buf || !(valid = buf_valid(term->buf))) {
// destroyed by `close_buffer`. Dont do anything else
if (!valid) {
term->buf = NULL;
}
continue;
}
bool pending_resize = term->pending_resize;
WITH_BUFFER(term->buf, {
refresh_size(term);
refresh_scrollback(term);
refresh_screen(term);
redraw_buf_later(term->buf, NOT_VALID);
});
adjust_topline(term, pending_resize);
refresh_terminal(term);
});
pmap_clear(ptr_t)(invalidated_terminals);
unblock_autocmds();
redraw(true);
end:
refresh_pending = false;
}
static void refresh_size(Terminal *term)

View File

@ -206,9 +206,10 @@ static bool handle_forced_escape(TermInput *input)
return false;
}
static void restart_reading(Event event);
static void restart_reading(void **argv);
static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
bool eof)
{
TermInput *input = data;
@ -226,8 +227,7 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
// ls *.md | xargs nvim
input->in_fd = 2;
stream_close(&input->read_stream, NULL);
loop_push_event(&loop,
(Event) { .data = input, .handler = restart_reading }, false);
queue_put(loop.fast_events, restart_reading, 1, input);
} else {
input_done();
}
@ -272,9 +272,9 @@ static void read_cb(Stream *stream, RBuffer *buf, void *data, bool eof)
rbuffer_reset(input->read_stream.buffer);
}
static void restart_reading(Event event)
static void restart_reading(void **argv)
{
TermInput *input = event.data;
TermInput *input = argv[0];
rstream_init_fd(&loop, &input->read_stream, input->in_fd, 0xfff, input);
rstream_start(&input->read_stream, read_cb);
}

View File

@ -205,22 +205,11 @@ static void tui_stop(UI *ui)
xfree(ui);
}
static void try_resize(Event ev)
{
UI *ui = ev.data;
update_size(ui);
ui_refresh();
}
static void sigwinch_cb(SignalWatcher *watcher, int signum, void *data)
{
got_winch = true;
// Queue the event because resizing can result in recursive event_poll calls
// FIXME(blueyed): TUI does not resize properly when not deferred. Why? #2322
loop_push_event(&loop, (Event) {
.data = data,
.handler = try_resize
}, true);
UI *ui = data;
update_size(ui);
ui_refresh();
}
static bool attrs_differ(HlAttrs a1, HlAttrs a2)

View File

@ -214,9 +214,9 @@ void ui_detach(UI *ui)
shift_index++;
}
ui_count--;
// schedule a refresh
loop_push_event(&loop, (Event) { .handler = refresh }, false);
if (--ui_count) {
ui_refresh();
}
}
void ui_clear(void)
@ -485,11 +485,3 @@ static void ui_mode_change(void)
UI_CALL(mode_change, mode);
conceal_check_cursur_line();
}
static void refresh(Event event)
{
if (ui_count) {
ui_refresh();
}
}

View File

@ -369,6 +369,7 @@ describe('clipboard usage', function()
[2] = {foreground = Screen.colors.Blue},
[3] = {bold = true, foreground = Screen.colors.SeaGreen}},
{{bold = true, foreground = Screen.colors.Blue}})
feed('<cr>') -- clear out of Press ENTER screen
end)
it('can paste "* to the commandline', function()

View File

@ -0,0 +1,16 @@
#include <string.h>
#include <stdlib.h>
#include "nvim/event/queue.h"
#include "queue.h"
void ut_queue_put(Queue *queue, const char *str)
{
queue_put(queue, NULL, 1, str);
}
const char *ut_queue_get(Queue *queue)
{
Event event = queue_get(queue);
return event.argv[0];
}

View File

@ -0,0 +1,4 @@
#include "nvim/event/queue.h"
void ut_queue_put(Queue *queue, const char *str);
const char *ut_queue_get(Queue *queue);

123
test/unit/queue_spec.lua Normal file
View File

@ -0,0 +1,123 @@
local helpers = require("test.unit.helpers")
local ffi = helpers.ffi
local eq = helpers.eq
local queue = helpers.cimport("./test/unit/fixtures/queue.h")
describe('queue', function()
local parent, child1, child2, child3
local function put(q, str)
queue.ut_queue_put(q, str)
end
local function get(q)
return ffi.string(queue.ut_queue_get(q))
end
local function free(q)
queue.queue_free(q)
end
before_each(function()
parent = queue.queue_new_parent(ffi.NULL, ffi.NULL)
child1 = queue.queue_new_child(parent)
child2 = queue.queue_new_child(parent)
child3 = queue.queue_new_child(parent)
put(child1, 'c1i1')
put(child1, 'c1i2')
put(child2, 'c2i1')
put(child1, 'c1i3')
put(child2, 'c2i2')
put(child2, 'c2i3')
put(child2, 'c2i4')
put(child3, 'c3i1')
put(child3, 'c3i2')
end)
it('removing from parent removes from child', function()
eq('c1i1', get(parent))
eq('c1i2', get(parent))
eq('c2i1', get(parent))
eq('c1i3', get(parent))
eq('c2i2', get(parent))
eq('c2i3', get(parent))
eq('c2i4', get(parent))
end)
it('removing from child removes from parent', function()
eq('c2i1', get(child2))
eq('c2i2', get(child2))
eq('c1i1', get(child1))
eq('c1i2', get(parent))
eq('c1i3', get(parent))
eq('c2i3', get(parent))
eq('c2i4', get(parent))
end)
it('removing from child at the beginning of parent', function()
eq('c1i1', get(child1))
eq('c1i2', get(child1))
eq('c2i1', get(parent))
end)
it('removing from parent after get from parent and put to child', function()
eq('c1i1', get(parent))
eq('c1i2', get(parent))
eq('c2i1', get(parent))
eq('c1i3', get(parent))
eq('c2i2', get(parent))
eq('c2i3', get(parent))
eq('c2i4', get(parent))
eq('c3i1', get(parent))
put(child1, 'c1i11')
put(child1, 'c1i22')
eq('c3i2', get(parent))
eq('c1i11', get(parent))
eq('c1i22', get(parent))
end)
it('removing from parent after get and put to child', function()
eq('c1i1', get(child1))
eq('c1i2', get(child1))
eq('c2i1', get(child2))
eq('c1i3', get(child1))
eq('c2i2', get(child2))
eq('c2i3', get(child2))
eq('c2i4', get(child2))
eq('c3i1', get(child3))
eq('c3i2', get(parent))
put(child1, 'c1i11')
put(child2, 'c2i11')
put(child1, 'c1i12')
eq('c2i11', get(child2))
eq('c1i11', get(parent))
eq('c1i12', get(parent))
end)
it('put after removing from child at the end of parent', function()
eq('c3i1', get(child3))
eq('c3i2', get(child3))
put(child1, 'c1i11')
put(child2, 'c2i11')
eq('c1i1', get(parent))
eq('c1i2', get(parent))
eq('c2i1', get(parent))
eq('c1i3', get(parent))
eq('c2i2', get(parent))
eq('c2i3', get(parent))
eq('c2i4', get(parent))
eq('c1i11', get(parent))
eq('c2i11', get(parent))
end)
it('removes from parent queue when child is freed', function()
free(child2)
eq('c1i1', get(parent))
eq('c1i2', get(parent))
eq('c1i3', get(parent))
eq('c3i1', get(child3))
eq('c3i2', get(child3))
end)
end)