Merge PR #1369 'Assert libuv event loop is properly cleaned up'

This commit is contained in:
Thiago de Arruda 2014-11-02 20:23:42 -03:00
commit 5a52bcfa22
9 changed files with 142 additions and 125 deletions

View File

@ -19541,7 +19541,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
event_push((Event) { \
.handler = on_job_event, \
.data = event_data \
}); \
}, true); \
} while(0)
static void on_job_stdout(RStream *rstream, void *data, bool eof)

View File

@ -435,13 +435,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
Array args = ARRAY_DICT_INIT;
msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
if (kv_size(channel->call_stack) || !handler.defer) {
call_request_handler(channel, handler, args, request_id);
return;
}
// Defer calling the request handler.
bool defer = (!kv_size(channel->call_stack) && handler.defer);
RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
event_data->channel = channel;
event_data->handler = handler;
@ -450,21 +444,16 @@ static void handle_request(Channel *channel, msgpack_object *request)
event_push((Event) {
.handler = on_request_event,
.data = event_data
});
}, defer);
}
static void on_request_event(Event event)
{
RequestEvent *e = event.data;
call_request_handler(e->channel, e->handler, e->args, e->request_id);
kmp_free(RequestEventPool, request_event_pool, e);
}
static void call_request_handler(Channel *channel,
MsgpackRpcRequestHandler handler,
Array args,
uint64_t request_id)
{
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, request_id, args, &error);
// send the response
@ -477,6 +466,7 @@ static void call_request_handler(Channel *channel,
&out_buffer));
// All arguments were freed already, but we still need to free the array
free(args.items);
kmp_free(RequestEventPool, request_event_pool, e);
}
static bool channel_write(Channel *channel, WBuffer *buffer)

View File

@ -11,7 +11,7 @@
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/log.h"
#include "nvim/tempfile.h"
#include "nvim/map.h"
#include "nvim/path.h"
@ -102,12 +102,12 @@ int server_start(const char *endpoint)
if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
// TODO(aktau): since this is not what the user wanted, perhaps we
// should return an error here
EMSG2("Address was too long, truncated to %s", addr);
WLOG("Address was too long, truncated to %s", addr);
}
// Check if the server already exists
if (pmap_has(cstr_t)(servers, addr)) {
EMSG2("Already listening on %s", addr);
ELOG("Already listening on %s", addr);
return 1;
}
@ -152,38 +152,30 @@ int server_start(const char *endpoint)
}
int result;
uv_stream_t *stream = NULL;
if (server_type == kServerTypeTcp) {
// Listen on tcp address/port
uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
server->socket.tcp.handle.data = server;
result = uv_tcp_bind(&server->socket.tcp.handle,
(const struct sockaddr *)&server->socket.tcp.addr,
0);
if (result == 0) {
result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
MAX_CONNECTIONS,
connection_cb);
if (result) {
uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
}
}
stream = (uv_stream_t *)&server->socket.tcp.handle;
} else {
// Listen on named pipe or unix socket
xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
server->socket.pipe.handle.data = server;
result = uv_pipe_bind(&server->socket.pipe.handle,
server->socket.pipe.addr);
if (result == 0) {
result = uv_listen((uv_stream_t *)&server->socket.pipe.handle,
MAX_CONNECTIONS,
connection_cb);
stream = (uv_stream_t *)&server->socket.pipe.handle;
}
if (result) {
uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
}
}
stream->data = server;
if (result == 0) {
result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
MAX_CONNECTIONS,
connection_cb);
}
assert(result <= 0); // libuv should have returned -errno or zero.
@ -196,13 +188,12 @@ int server_start(const char *endpoint)
result = -ENOENT;
}
}
EMSG2("Failed to start server: %s", uv_strerror(result));
free(server);
uv_close((uv_handle_t *)stream, free_server);
ELOG("Failed to start server: %s", uv_strerror(result));
return result;
}
server->type = server_type;
// Add the server to the hash table
pmap_put(cstr_t)(servers, addr, server);
@ -221,7 +212,7 @@ void server_stop(char *endpoint)
xstrlcpy(addr, endpoint, sizeof(addr));
if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) {
EMSG2("Not listening on %s", addr);
ELOG("Not listening on %s", addr);
return;
}
@ -255,7 +246,7 @@ static void connection_cb(uv_stream_t *server, int status)
result = uv_accept(server, client);
if (result) {
EMSG2("Failed to accept connection: %s", uv_strerror(result));
ELOG("Failed to accept connection: %s", uv_strerror(result));
uv_close((uv_handle_t *)client, free_client);
return;
}

View File

@ -14,6 +14,7 @@
#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
#include "nvim/os/wstream.h"
#include "nvim/os/job.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@ -34,17 +35,24 @@ typedef struct {
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *pending_events;
// deferred_events: Events that should be processed as the K_EVENT special key
// immediate_events: Events that should be processed after exiting libuv event
// loop(to avoid recursion), but before returning from
// `event_poll`
static klist_t(Event) *deferred_events, *immediate_events;
void event_init(void)
{
// Initialize the event queues
deferred_events = kl_init(Event);
immediate_events = kl_init(Event);
// early msgpack-rpc initialization
msgpack_rpc_init_method_table();
msgpack_rpc_helpers_init();
// Initialize the event queues
pending_events = kl_init(Event);
wstream_init();
// Initialize input events
input_init();
input_start();
// Timer to wake the event loop if a timeout argument is passed to
// `event_poll`
// Signals
@ -63,20 +71,27 @@ void event_teardown(void)
channel_teardown();
job_teardown();
server_teardown();
signal_teardown();
input_stop();
input_teardown();
do {
// This will loop forever if we leave any unclosed handles. Currently it is
// the most reliable way to use travis for verifying the no libuv-related
// bugs(which can be hard to track later) were introduced on a PR.
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
} while (uv_loop_close(uv_default_loop()));
}
// Wait for some event
void event_poll(int ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
static int recursive = 0;
if (!(recursive++)) {
// Only needs to start the libuv handle the first time we enter here
input_start();
if (recursive++) {
abort(); // Should not re-enter uv_run
}
uv_run_mode run_mode = UV_RUN_ONCE;
uv_timer_t timer;
uv_prepare_t timer_prepare;
TimerData timer_data = {.ms = ms, .timed_out = false, .timer = &timer};
@ -99,37 +114,41 @@ void event_poll(int ms)
loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
input_stop();
}
if (ms > 0) {
// Ensure the timer-related handles are closed and run the event loop
// once more to let libuv perform it's cleanup
uv_timer_stop(&timer);
uv_prepare_stop(&timer_prepare);
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
loop(UV_RUN_NOWAIT);
}
recursive--; // Can re-enter uv_run now
process_events_from(immediate_events);
}
bool event_has_deferred(void)
{
return !kl_empty(pending_events);
return !kl_empty(deferred_events);
}
// Queue an event
void event_push(Event event)
void event_push(Event event, bool deferred)
{
*kl_pushp(Event, pending_events) = event;
*kl_pushp(Event, deferred ? deferred_events : immediate_events) = event;
}
void event_process(void)
{
process_events_from(deferred_events);
}
static void process_events_from(klist_t(Event) *queue)
{
Event event;
while (kl_shift(Event, pending_events, &event) == 0) {
while (kl_shift(Event, queue, &event) == 0) {
event.handler(event);
}
}

View File

@ -52,6 +52,15 @@ void input_init(void)
rstream_set_file(read_stream, read_cmd_fd);
}
void input_teardown(void)
{
if (embedded_mode) {
return;
}
rstream_free(read_stream);
}
// Listen for input
void input_start(void)
{

View File

@ -88,51 +88,18 @@ void job_init(void)
/// Releases job control resources and terminates running jobs
void job_teardown(void)
{
// 20 tries will give processes about 1 sec to exit cleanly
uint32_t remaining_tries = 20;
bool all_dead = true;
int i;
Job *job;
// Politely ask each job to terminate
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
// Stop all jobs
for (int i = 0; i < MAX_RUNNING_JOBS; i++) {
Job *job;
if ((job = table[i]) != NULL) {
all_dead = false;
uv_process_kill(&job->proc, SIGTERM);
job_stop(job);
}
}
if (all_dead) {
return;
}
os_delay(10, 0);
// Right now any exited process are zombies waiting for us to acknowledge
// their status with `wait` or handling SIGCHLD. libuv does that
// automatically (and then calls `exit_cb`) but we have to give it a chance
// by running the loop one more time
event_poll(0);
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
job = table[i];
// Still alive
while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
event_poll(0);
// It's possible that the event_poll call removed the job from the table,
// reset 'job' so the next iteration won't run in that case.
job = table[i];
}
if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
// Last run to ensure all children were removed
event_poll(0);
// Wait until all jobs are closed
event_poll_until(-1, !stop_requests);
// Close the timer
uv_close((uv_handle_t *)&job_stop_timer, NULL);
}
/// Tries to start a new job.
@ -427,18 +394,13 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data);
}
if (!--stop_requests) {
if (stop_requests && !--stop_requests) {
// Stop the timer if no more stop requests are pending
DLOG("Stopping job kill timer");
uv_timer_stop(&job_stop_timer);
}
}
static bool is_alive(Job *job)
{
return uv_process_kill(&job->proc, 0) == 0;
}
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_stop_timer_cb(uv_timer_t *handle)

View File

@ -2,6 +2,8 @@
#include <uv.h>
#include "nvim/lib/klist.h"
#include "nvim/types.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
@ -13,6 +15,11 @@
#include "nvim/misc1.h"
#include "nvim/misc2.h"
#include "nvim/os/signal.h"
#include "nvim/os/event.h"
#define SignalEventFreer(x)
KMEMPOOL_INIT(SignalEventPool, int, SignalEventFreer)
kmempool_t(SignalEventPool) *signal_event_pool = NULL;
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
#ifdef SIGPWR
@ -26,6 +33,7 @@ static bool rejecting_deadly;
#endif
void signal_init(void)
{
signal_event_pool = kmp_init(SignalEventPool);
uv_signal_init(uv_default_loop(), &sint);
uv_signal_init(uv_default_loop(), &spipe);
uv_signal_init(uv_default_loop(), &shup);
@ -47,6 +55,20 @@ void signal_init(void)
#endif
}
void signal_teardown(void)
{
signal_stop();
uv_close((uv_handle_t *)&sint, NULL);
uv_close((uv_handle_t *)&spipe, NULL);
uv_close((uv_handle_t *)&shup, NULL);
uv_close((uv_handle_t *)&squit, NULL);
uv_close((uv_handle_t *)&sterm, NULL);
uv_close((uv_handle_t *)&swinch, NULL);
#ifdef SIGPWR
uv_close((uv_handle_t *)&spwr, NULL);
#endif
}
void signal_stop(void)
{
uv_signal_stop(&sint);
@ -113,6 +135,19 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
int *n = kmp_alloc(SignalEventPool, signal_event_pool);
*n = signum;
event_push((Event) {
.handler = on_signal_event,
.data = n
}, false);
}
static void on_signal_event(Event event)
{
int signum = *((int *)event.data);
kmp_free(SignalEventPool, signal_event_pool, event.data);
switch (signum) {
case SIGINT:
got_int = true;
@ -142,3 +177,4 @@ static void signal_cb(uv_signal_t *handle, int signum)
break;
}
}

View File

@ -5,6 +5,8 @@
#include <uv.h>
#include "nvim/lib/klist.h"
#include "nvim/os/uv_helpers.h"
#include "nvim/os/wstream.h"
#include "nvim/os/wstream_defs.h"
@ -36,13 +38,27 @@ struct wbuffer {
typedef struct {
WStream *wstream;
WBuffer *buffer;
} WriteData;
uv_write_t uv_req;
} WRequest;
#define WRequestFreer(x)
KMEMPOOL_INIT(WRequestPool, WRequest, WRequestFreer)
kmempool_t(WRequestPool) *wrequest_pool = NULL;
#define WBufferFreer(x)
KMEMPOOL_INIT(WBufferPool, WBuffer, WBufferFreer)
kmempool_t(WBufferPool) *wbuffer_pool = NULL;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/wstream.c.generated.h"
#endif
/// Initialize pools for reusing commonly created objects
void wstream_init(void)
{
wrequest_pool = kmp_init(WRequestPool);
wbuffer_pool = kmp_init(WBufferPool);
}
/// Creates a new WStream instance. A WStream encapsulates all the boilerplate
/// necessary for writing to a libuv stream.
///
@ -148,20 +164,17 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
wstream->curmem += buffer->size;
WriteData *data = xmalloc(sizeof(WriteData));
WRequest *data = kmp_alloc(WRequestPool, wrequest_pool);
data->wstream = wstream;
data->buffer = buffer;
uv_write_t *req = xmalloc(sizeof(uv_write_t));
req->data = data;
data->uv_req.data = data;
uv_buf_t uvbuf;
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
free(data);
free(req);
if (uv_write(&data->uv_req, wstream->stream, &uvbuf, 1, write_cb)) {
kmp_free(WRequestPool, wrequest_pool, data);
goto err;
}
@ -190,7 +203,7 @@ WBuffer *wstream_new_buffer(char *data,
size_t refcount,
wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
WBuffer *rv = kmp_alloc(WBufferPool, wbuffer_pool);
rv->size = size;
rv->refcount = refcount;
rv->cb = cb;
@ -201,9 +214,8 @@ WBuffer *wstream_new_buffer(char *data,
static void write_cb(uv_write_t *req, int status)
{
WriteData *data = req->data;
WRequest *data = req->data;
free(req);
data->wstream->curmem -= data->buffer->size;
release_wbuffer(data->buffer);
@ -221,7 +233,7 @@ static void write_cb(uv_write_t *req, int status)
free(data->wstream);
}
free(data);
kmp_free(WRequestPool, wrequest_pool, data);
}
static void release_wbuffer(WBuffer *buffer)
@ -231,7 +243,7 @@ static void release_wbuffer(WBuffer *buffer)
buffer->cb(buffer->data);
}
free(buffer);
kmp_free(WBufferPool, wbuffer_pool, buffer);
}
}

View File

@ -522,8 +522,6 @@ void mch_exit(int r)
{
exiting = TRUE;
event_teardown();
{
settmode(TMODE_COOK);
mch_restore_title(3); /* restore xterm title and icon name */
@ -559,7 +557,7 @@ void mch_exit(int r)
mac_conv_cleanup();
#endif
event_teardown();
#ifdef EXITFREE
free_all_mem();