mirror of
https://github.com/neovim/neovim.git
synced 2025-02-25 18:55:25 -06:00
rpc: Don't delay notifications when request is pending (#6544)
With the old behavior, if a GUI makes a blocking request that requires user
interaction (like nvim_input()), it would not get any screen updates.
The client, not nvim, should decide how to handle notifications during a
pending request. If an rplugin wants to avoid async calls while a sync call is
busy, it likely wants to avoid processing async calls while another async call
also is handled as well.
This may break the expectation of some existing rplugins. For compatibility,
remote/define.vim reimplements the old behavior. Clients can opt-out by
specifying `sync=urgent`.
- Legacy hosts should be updated to use `sync=urgent`. They could add a flag
indicating which async methods are always safe to call and which must wait
until the main loop returns.
- New hosts can expose the full asyncness, they don't need to offer both
behaviors.
ref #6532
ref #1398 d83868fe90
This commit is contained in:
parent
3a938fff09
commit
2a3bcd1ff8
@ -169,14 +169,40 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
|
|||||||
exe function_def
|
exe function_def
|
||||||
endfunction
|
endfunction
|
||||||
|
|
||||||
|
let s:busy = {}
|
||||||
|
let s:pending_notifications = {}
|
||||||
|
|
||||||
function! s:GetRpcFunction(sync)
|
function! s:GetRpcFunction(sync)
|
||||||
if a:sync
|
if a:sync ==# 'urgent'
|
||||||
return 'rpcrequest'
|
return 'rpcnotify'
|
||||||
|
elseif a:sync
|
||||||
|
return 'remote#define#request'
|
||||||
endif
|
endif
|
||||||
return 'rpcnotify'
|
return 'remote#define#notify'
|
||||||
endfunction
|
endfunction
|
||||||
|
|
||||||
|
function! remote#define#notify(chan, ...)
|
||||||
|
if get(s:busy, a:chan, 0) > 0
|
||||||
|
let pending = get(s:pending_notifications, a:chan, [])
|
||||||
|
call add(pending, deepcopy(a:000))
|
||||||
|
let s:pending_notifications[a:chan] = pending
|
||||||
|
else
|
||||||
|
call call('rpcnotify', [a:chan] + a:000)
|
||||||
|
endif
|
||||||
|
endfunction
|
||||||
|
|
||||||
|
function! remote#define#request(chan, ...)
|
||||||
|
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
|
||||||
|
let val = call('rpcrequest', [a:chan]+a:000)
|
||||||
|
let s:busy[a:chan] -= 1
|
||||||
|
if s:busy[a:chan] == 0
|
||||||
|
for msg in get(s:pending_notifications, a:chan, [])
|
||||||
|
call call('rpcnotify', [a:chan] + msg)
|
||||||
|
endfor
|
||||||
|
let s:pending_notifications[a:chan] = []
|
||||||
|
endif
|
||||||
|
return val
|
||||||
|
endfunction
|
||||||
|
|
||||||
function! s:GetCommandPrefix(name, opts)
|
function! s:GetCommandPrefix(name, opts)
|
||||||
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
|
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',
|
||||||
|
@ -2531,6 +2531,7 @@ def CheckSpacing(filename, clean_lines, linenum, nesting_state, error):
|
|||||||
r'(?<!\bkhash_t)'
|
r'(?<!\bkhash_t)'
|
||||||
r'(?<!\bkbtree_t)'
|
r'(?<!\bkbtree_t)'
|
||||||
r'(?<!\bkbitr_t)'
|
r'(?<!\bkbitr_t)'
|
||||||
|
r'(?<!\bPMap)'
|
||||||
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
|
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
|
||||||
r' +'
|
r' +'
|
||||||
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)
|
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)
|
||||||
|
@ -56,7 +56,6 @@ typedef struct {
|
|||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
size_t refcount;
|
size_t refcount;
|
||||||
size_t pending_requests;
|
|
||||||
PMap(cstr_t) *subscribed_events;
|
PMap(cstr_t) *subscribed_events;
|
||||||
bool closed;
|
bool closed;
|
||||||
ChannelType type;
|
ChannelType type;
|
||||||
@ -71,7 +70,6 @@ typedef struct {
|
|||||||
} data;
|
} data;
|
||||||
uint64_t next_request_id;
|
uint64_t next_request_id;
|
||||||
kvec_t(ChannelCallFrame *) call_stack;
|
kvec_t(ChannelCallFrame *) call_stack;
|
||||||
kvec_t(WBuffer *) delayed_notifications;
|
|
||||||
MultiQueue *events;
|
MultiQueue *events;
|
||||||
} Channel;
|
} Channel;
|
||||||
|
|
||||||
@ -205,14 +203,7 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (channel) {
|
if (channel) {
|
||||||
if (channel->pending_requests) {
|
send_event(channel, name, args);
|
||||||
// Pending request, queue the notification for later sending.
|
|
||||||
const String method = cstr_as_string((char *)name);
|
|
||||||
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
|
|
||||||
kv_push(channel->delayed_notifications, buffer);
|
|
||||||
} else {
|
|
||||||
send_event(channel, name, args);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
broadcast_event(name, args);
|
broadcast_event(name, args);
|
||||||
}
|
}
|
||||||
@ -248,10 +239,8 @@ Object channel_send_call(uint64_t id,
|
|||||||
// Push the frame
|
// Push the frame
|
||||||
ChannelCallFrame frame = { request_id, false, false, NIL };
|
ChannelCallFrame frame = { request_id, false, false, NIL };
|
||||||
kv_push(channel->call_stack, &frame);
|
kv_push(channel->call_stack, &frame);
|
||||||
channel->pending_requests++;
|
|
||||||
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
|
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
|
||||||
(void)kv_pop(channel->call_stack);
|
(void)kv_pop(channel->call_stack);
|
||||||
channel->pending_requests--;
|
|
||||||
|
|
||||||
if (frame.errored) {
|
if (frame.errored) {
|
||||||
if (frame.result.type == kObjectTypeString) {
|
if (frame.result.type == kObjectTypeString) {
|
||||||
@ -276,10 +265,6 @@ Object channel_send_call(uint64_t id,
|
|||||||
api_free_object(frame.result);
|
api_free_object(frame.result);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!channel->pending_requests) {
|
|
||||||
send_delayed_notifications(channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
decref(channel);
|
decref(channel);
|
||||||
|
|
||||||
return frame.errored ? NIL : frame.result;
|
return frame.errored ? NIL : frame.result;
|
||||||
@ -704,11 +689,7 @@ static void broadcast_event(const char *name, Array args)
|
|||||||
|
|
||||||
for (size_t i = 0; i < kv_size(subscribed); i++) {
|
for (size_t i = 0; i < kv_size(subscribed); i++) {
|
||||||
Channel *channel = kv_A(subscribed, i);
|
Channel *channel = kv_A(subscribed, i);
|
||||||
if (channel->pending_requests) {
|
channel_write(channel, buffer);
|
||||||
kv_push(channel->delayed_notifications, buffer);
|
|
||||||
} else {
|
|
||||||
channel_write(channel, buffer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
@ -786,7 +767,6 @@ static void free_channel(Channel *channel)
|
|||||||
|
|
||||||
pmap_free(cstr_t)(channel->subscribed_events);
|
pmap_free(cstr_t)(channel->subscribed_events);
|
||||||
kv_destroy(channel->call_stack);
|
kv_destroy(channel->call_stack);
|
||||||
kv_destroy(channel->delayed_notifications);
|
|
||||||
if (channel->type != kChannelTypeProc) {
|
if (channel->type != kChannelTypeProc) {
|
||||||
multiqueue_free(channel->events);
|
multiqueue_free(channel->events);
|
||||||
}
|
}
|
||||||
@ -811,11 +791,9 @@ static Channel *register_channel(ChannelType type, uint64_t id,
|
|||||||
rv->closed = false;
|
rv->closed = false;
|
||||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||||
rv->id = id > 0 ? id : next_chan_id++;
|
rv->id = id > 0 ? id : next_chan_id++;
|
||||||
rv->pending_requests = 0;
|
|
||||||
rv->subscribed_events = pmap_new(cstr_t)();
|
rv->subscribed_events = pmap_new(cstr_t)();
|
||||||
rv->next_request_id = 1;
|
rv->next_request_id = 1;
|
||||||
kv_init(rv->call_stack);
|
kv_init(rv->call_stack);
|
||||||
kv_init(rv->delayed_notifications);
|
|
||||||
pmap_put(uint64_t)(channels, rv->id, rv);
|
pmap_put(uint64_t)(channels, rv->id, rv);
|
||||||
|
|
||||||
ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
|
ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
|
||||||
@ -912,16 +890,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
|
|||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void send_delayed_notifications(Channel* channel)
|
|
||||||
{
|
|
||||||
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
|
|
||||||
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
|
|
||||||
channel_write(channel, buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
kv_size(channel->delayed_notifications) = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void incref(Channel *channel)
|
static void incref(Channel *channel)
|
||||||
{
|
{
|
||||||
channel->refcount++;
|
channel->refcount++;
|
||||||
|
@ -109,7 +109,28 @@ describe('server -> client', function()
|
|||||||
end)
|
end)
|
||||||
|
|
||||||
describe('requests and notifications interleaved', function()
|
describe('requests and notifications interleaved', function()
|
||||||
-- This tests that the following scenario won't happen:
|
it('does not delay notifications during pending request', function()
|
||||||
|
local received = false
|
||||||
|
local function on_setup()
|
||||||
|
eq("retval", funcs.rpcrequest(cid, "doit"))
|
||||||
|
stop()
|
||||||
|
end
|
||||||
|
local function on_request(method)
|
||||||
|
if method == "doit" then
|
||||||
|
funcs.rpcnotify(cid, "headsup")
|
||||||
|
eq(true,received)
|
||||||
|
return "retval"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
local function on_notification(method)
|
||||||
|
if method == "headsup" then
|
||||||
|
received = true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
run(on_request, on_notification, on_setup)
|
||||||
|
end)
|
||||||
|
|
||||||
|
-- This tests the following scenario:
|
||||||
--
|
--
|
||||||
-- server->client [request ] (1)
|
-- server->client [request ] (1)
|
||||||
-- client->server [request ] (2) triggered by (1)
|
-- client->server [request ] (2) triggered by (1)
|
||||||
@ -124,36 +145,38 @@ describe('server -> client', function()
|
|||||||
-- only deals with one server->client request at a time. (In other words,
|
-- only deals with one server->client request at a time. (In other words,
|
||||||
-- the client cannot send a response to a request that is not at the top
|
-- the client cannot send a response to a request that is not at the top
|
||||||
-- of nvim's request stack).
|
-- of nvim's request stack).
|
||||||
--
|
pending('will close connection if not properly synchronized', function()
|
||||||
-- But above scenario shoudn't happen by the way notifications are dealt in
|
|
||||||
-- Nvim: they are only sent after there are no pending server->client
|
|
||||||
-- request(the request stack fully unwinds). So (3) is only sent after the
|
|
||||||
-- client returns (6).
|
|
||||||
it('works', function()
|
|
||||||
local expected = 300
|
|
||||||
local notified = 0
|
|
||||||
local function on_setup()
|
local function on_setup()
|
||||||
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
|
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
|
||||||
end
|
end
|
||||||
|
|
||||||
local function on_request(method)
|
local function on_request(method)
|
||||||
eq('notify', method)
|
if method == "notify" then
|
||||||
eq(1, eval('rpcnotify('..cid..', "notification")'))
|
eq(1, eval('rpcnotify('..cid..', "notification")'))
|
||||||
return 'notified!'
|
return 'notified!'
|
||||||
|
elseif method == "nested" then
|
||||||
|
-- do some busywork, so the first request will return
|
||||||
|
-- before this one
|
||||||
|
for _ = 1, 5 do
|
||||||
|
eq(2, eval("1+1"))
|
||||||
|
end
|
||||||
|
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
|
||||||
|
return 'done!'
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
local function on_notification(method)
|
local function on_notification(method)
|
||||||
eq('notification', method)
|
if method == "notification" then
|
||||||
if notified == expected then
|
eq('done!', eval('rpcrequest('..cid..', "nested")'))
|
||||||
stop()
|
elseif method == "nested_done" then
|
||||||
return
|
-- this should never have been sent
|
||||||
|
ok(false)
|
||||||
end
|
end
|
||||||
notified = notified + 1
|
|
||||||
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
|
|
||||||
end
|
end
|
||||||
|
|
||||||
run(on_request, on_notification, on_setup)
|
run(on_request, on_notification, on_setup)
|
||||||
eq(expected, notified)
|
-- ignore disconnect failure, otherwise detected by after_each
|
||||||
|
clear()
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user