event: Reintroduce the immediate event queue

Commit @264e0d872c("Remove automatic event deferral") removed the immediate
event queue because event deferral now had to be explicit. The problem is that
while some events don't need to be deferred, they still can result in recursive
`event_poll` calls, and recursion is not supported by libuv. Examples of those
are msgpack-rpc requests while a server->client request is pending, or signals
which can call `mch_exit`(and that will result in `uv_run` calls).

To fix the problem, this reintroduces the immediate event queue for events that
can potentially result in event loop recursion. The non-deferred events are
still processed in `event_poll`, but only after `uv_run` returns.
This commit is contained in:
Thiago de Arruda 2014-11-02 16:37:08 -03:00
parent e378965a44
commit a1dd70b1d0
4 changed files with 54 additions and 33 deletions

View File

@ -19541,7 +19541,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
event_push((Event) { \
.handler = on_job_event, \
.data = event_data \
}); \
}, true); \
} while(0)
static void on_job_stdout(RStream *rstream, void *data, bool eof)

View File

@ -435,13 +435,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
Array args = ARRAY_DICT_INIT;
msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
if (kv_size(channel->call_stack) || !handler.defer) {
call_request_handler(channel, handler, args, request_id);
return;
}
// Defer calling the request handler.
bool defer = (!kv_size(channel->call_stack) && handler.defer);
RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
event_data->channel = channel;
event_data->handler = handler;
@ -450,21 +444,16 @@ static void handle_request(Channel *channel, msgpack_object *request)
event_push((Event) {
.handler = on_request_event,
.data = event_data
});
}, defer);
}
static void on_request_event(Event event)
{
RequestEvent *e = event.data;
call_request_handler(e->channel, e->handler, e->args, e->request_id);
kmp_free(RequestEventPool, request_event_pool, e);
}
static void call_request_handler(Channel *channel,
MsgpackRpcRequestHandler handler,
Array args,
uint64_t request_id)
{
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, request_id, args, &error);
// send the response
@ -477,6 +466,7 @@ static void call_request_handler(Channel *channel,
&out_buffer));
// All arguments were freed already, but we still need to free the array
free(args.items);
kmp_free(RequestEventPool, request_event_pool, e);
}
static bool channel_write(Channel *channel, WBuffer *buffer)

View File

@ -35,18 +35,24 @@ typedef struct {
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *deferred_events;
// deferred_events: Events that should be processed as the K_EVENT special key
// immediate_events: Events that should be processed after exiting libuv event
// loop(to avoid recursion), but before returning from
// `event_poll`
static klist_t(Event) *deferred_events, *immediate_events;
void event_init(void)
{
// Initialize the event queues
deferred_events = kl_init(Event);
immediate_events = kl_init(Event);
// early msgpack-rpc initialization
msgpack_rpc_init_method_table();
msgpack_rpc_helpers_init();
wstream_init();
// Initialize input events
input_init();
input_start();
// Timer to wake the event loop if a timeout argument is passed to
// `event_poll`
// Signals
@ -65,20 +71,19 @@ void event_teardown(void)
channel_teardown();
job_teardown();
server_teardown();
input_stop();
}
// Wait for some event
void event_poll(int ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
static int recursive = 0;
if (!(recursive++)) {
// Only needs to start the libuv handle the first time we enter here
input_start();
if (recursive++) {
abort(); // Should not re-enter uv_run
}
uv_run_mode run_mode = UV_RUN_ONCE;
uv_timer_t timer;
uv_prepare_t timer_prepare;
TimerData timer_data = {.ms = ms, .timed_out = false, .timer = &timer};
@ -101,18 +106,18 @@ void event_poll(int ms)
loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
input_stop();
}
if (ms > 0) {
// Ensure the timer-related handles are closed and run the event loop
// once more to let libuv perform it's cleanup
uv_timer_stop(&timer);
uv_prepare_stop(&timer_prepare);
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
loop(UV_RUN_NOWAIT);
}
recursive--; // Can re-enter uv_run now
process_events_from(immediate_events);
}
bool event_has_deferred(void)
@ -121,17 +126,21 @@ bool event_has_deferred(void)
}
// Queue an event
void event_push(Event event)
void event_push(Event event, bool deferred)
{
*kl_pushp(Event, deferred_events) = event;
*kl_pushp(Event, deferred ? deferred_events : immediate_events) = event;
}
void event_process(void)
{
process_events_from(deferred_events);
}
static void process_events_from(klist_t(Event) *queue)
{
Event event;
while (kl_shift(Event, deferred_events, &event) == 0) {
while (kl_shift(Event, queue, &event) == 0) {
event.handler(event);
}
}

View File

@ -2,6 +2,8 @@
#include <uv.h>
#include "nvim/lib/klist.h"
#include "nvim/types.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
@ -13,6 +15,11 @@
#include "nvim/misc1.h"
#include "nvim/misc2.h"
#include "nvim/os/signal.h"
#include "nvim/os/event.h"
#define SignalEventFreer(x)
KMEMPOOL_INIT(SignalEventPool, int, SignalEventFreer)
kmempool_t(SignalEventPool) *signal_event_pool = NULL;
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
#ifdef SIGPWR
@ -26,6 +33,7 @@ static bool rejecting_deadly;
#endif
void signal_init(void)
{
signal_event_pool = kmp_init(SignalEventPool);
uv_signal_init(uv_default_loop(), &sint);
uv_signal_init(uv_default_loop(), &spipe);
uv_signal_init(uv_default_loop(), &shup);
@ -113,6 +121,19 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
int *n = kmp_alloc(SignalEventPool, signal_event_pool);
*n = signum;
event_push((Event) {
.handler = on_signal_event,
.data = n
}, false);
}
static void on_signal_event(Event event)
{
int signum = *((int *)event.data);
kmp_free(SignalEventPool, signal_event_pool, event.data);
switch (signum) {
case SIGINT:
got_int = true;
@ -142,3 +163,4 @@ static void signal_cb(uv_signal_t *handle, int signum)
break;
}
}