mirror of
https://github.com/neovim/neovim.git
synced 2025-02-25 18:55:25 -06:00
channel: Bugfixes and refactor
- All functions that require a channel id will fail when the channel was disabled - Rewrite `call_stack_unwind` as `call_set_error`. It will now disable the channel and set error on all frames. The stack will be unwinded automatically while the involved functions exit. - Remove `disable_channel` function. If channels are disabled, they will be closed as soon as possible
This commit is contained in:
parent
bce4c365bc
commit
0b2b1da0e8
@ -143,7 +143,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (id > 0) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id))) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
msgpack_rpc_free_object(arg);
|
||||
return false;
|
||||
}
|
||||
@ -163,7 +163,7 @@ bool channel_send_call(uint64_t id,
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id))) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
msgpack_rpc_free_object(arg);
|
||||
return false;
|
||||
}
|
||||
@ -178,6 +178,8 @@ bool channel_send_call(uint64_t id,
|
||||
"while processing a RPC call",
|
||||
channel->id);
|
||||
*result = STRING_OBJ(cstr_to_string(buf));
|
||||
msgpack_rpc_free_object(arg);
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t request_id = channel->next_request_id++;
|
||||
@ -235,7 +237,7 @@ void channel_subscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id))) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
abort();
|
||||
}
|
||||
|
||||
@ -257,7 +259,7 @@ void channel_unsubscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id))) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
abort();
|
||||
}
|
||||
|
||||
@ -286,7 +288,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
||||
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||
"closed by the client",
|
||||
channel->id);
|
||||
disable_channel(channel, buf);
|
||||
call_set_error(channel, buf);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -316,7 +318,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
||||
" a matching id for the current RPC call. Ensure the client "
|
||||
" is properly synchronized",
|
||||
channel->id);
|
||||
call_stack_unwind(channel, buf, 1);
|
||||
call_set_error(channel, buf);
|
||||
}
|
||||
msgpack_unpacked_destroy(&unpacked);
|
||||
// Bail out from this event loop iteration
|
||||
@ -369,7 +371,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||
"closed due to a failed write",
|
||||
channel->id);
|
||||
disable_channel(channel, buf);
|
||||
call_set_error(channel, buf);
|
||||
}
|
||||
|
||||
return success;
|
||||
@ -450,6 +452,15 @@ static void close_channel(Channel *channel)
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
msgpack_unpacker_free(channel->unpacker);
|
||||
|
||||
// Unsubscribe from all events
|
||||
char *event_string;
|
||||
map_foreach_value(channel->subscribed_events, event_string, {
|
||||
unsubscribe(channel, event_string);
|
||||
});
|
||||
|
||||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
|
||||
if (channel->is_job) {
|
||||
if (channel->data.job) {
|
||||
job_stop(channel->data.job);
|
||||
@ -460,14 +471,6 @@ static void close_channel(Channel *channel)
|
||||
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
|
||||
}
|
||||
|
||||
// Unsubscribe from all events
|
||||
char *event_string;
|
||||
map_foreach_value(channel->subscribed_events, event_string, {
|
||||
unsubscribe(channel, event_string);
|
||||
});
|
||||
|
||||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
free(channel);
|
||||
}
|
||||
|
||||
@ -510,10 +513,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
|
||||
|
||||
static void call_stack_pop(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack,
|
||||
kv_size(channel->call_stack) - 1);
|
||||
ChannelCallFrame *frame = kv_pop(channel->call_stack);
|
||||
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
|
||||
(void)kv_pop(channel->call_stack);
|
||||
|
||||
if (frame->errored) {
|
||||
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
|
||||
@ -522,24 +523,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
|
||||
}
|
||||
}
|
||||
|
||||
static void call_stack_unwind(Channel *channel, char *msg, int count)
|
||||
static void call_set_error(Channel *channel, char *msg)
|
||||
{
|
||||
while (kv_size(channel->call_stack) && count--) {
|
||||
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_pop(channel->call_stack);
|
||||
frame->errored = true;
|
||||
frame->result = STRING_OBJ(cstr_to_string(msg));
|
||||
}
|
||||
}
|
||||
|
||||
static void disable_channel(Channel *channel, char *msg)
|
||||
{
|
||||
if (kv_size(channel->call_stack)) {
|
||||
// Channel is currently in the middle of a call, remove all frames and mark
|
||||
// it as "dead"
|
||||
channel->enabled = false;
|
||||
call_stack_unwind(channel, msg, -1);
|
||||
} else {
|
||||
// Safe to close it now
|
||||
close_channel(channel);
|
||||
}
|
||||
channel->enabled = false;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user