msgpack-rpc: Allow selective deferral API calls

Since all API functions now run immediately after a msgpack-rpc request is
parsed by libuv callbacks, a mechanism was added to override this behavior and
allow certain functions to run in Nvim main loop.

The mechanism is simple: Any API function tagged with the FUNC_ATTR_DEFERRED (a
"dummy" attribute only used by msgpack-gen.lua) will be called when Nvim main
loop receives a K_EVENT key.

To implement this mechanism it was necessary some restructuration on the
msgpack-rpc modules, especially in the msgpack-gen.lua script.
This commit is contained in:
Thiago de Arruda 2014-10-20 20:07:01 -03:00
parent 72f028abcb
commit 72e3e57bf1
7 changed files with 151 additions and 88 deletions

View File

@ -34,6 +34,8 @@ c_params = Ct(c_void + c_param_list)
c_proto = Ct(
Cg(c_type, 'return_type') * Cg(c_id, 'name') *
fill * P('(') * fill * Cg(c_params, 'parameters') * fill * P(')') *
Cg(Cc(false), 'deferred') *
(fill * Cg((P('FUNC_ATTR_DEFERRED') * Cc(true)), 'deferred') ^ -1) *
fill * P(';')
)
grammar = Ct((c_proto + c_comment + c_preproc + ws) ^ 1)
@ -159,9 +161,10 @@ for i = 1, #functions do
local fn = functions[i]
local args = {}
output:write('static Object handle_'..fn.name..'(uint64_t channel_id, msgpack_object *req, Error *error)')
output:write('static Object handle_'..fn.name..'(uint64_t channel_id, uint64_t request_id, Array args, Error *error)')
output:write('\n{')
output:write('\n DLOG("Received msgpack-rpc call to '..fn.name..'(request id: %" PRIu64 ")", req->via.array.ptr[1].via.u64);')
output:write('\n DLOG("Handling msgpack-rpc call to '..fn.name..'(request id: %" PRIu64 ")", request_id);')
output:write('\n Object ret = NIL;')
-- Declare/initialize variables that will hold converted arguments
for j = 1, #fn.parameters do
local param = fn.parameters[j]
@ -169,8 +172,8 @@ for i = 1, #functions do
output:write('\n '..param[1]..' '..converted..' api_init_'..string.lower(real_type(param[1]))..';')
end
output:write('\n')
output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n if (args.size != '..#fn.parameters..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %zu", args.size);')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }\n')
@ -179,14 +182,18 @@ for i = 1, #functions do
for j = 1, #fn.parameters do
local converted, convert_arg, param, arg
param = fn.parameters[j]
arg = '(req->via.array.ptr[3].via.array.ptr + '..(j - 1)..')'
converted = 'arg_'..j
convert_arg = 'msgpack_rpc_to_'..real_type(param[1]):lower()
output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }\n')
if real_type(param[1]) ~= 'Object' then
output:write('\n if (args.items['..(j - 1)..'].type != kObjectType'..real_type(param[1])..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }')
output:write('\n '..converted..' = args.items['..(j - 1)..'].data.'..real_type(param[1]):lower()..';\n')
else
output:write('\n '..converted..' = args.items['..(j - 1)..'];\n')
end
args[#args + 1] = converted
end
@ -228,7 +235,7 @@ for i = 1, #functions do
end
if fn.return_type ~= 'void' then
output:write('\n Object ret = '..string.upper(real_type(fn.return_type))..'_OBJ(rv);')
output:write('\n ret = '..string.upper(real_type(fn.return_type))..'_OBJ(rv);')
end
-- Now generate the cleanup label for freeing memory allocated for the
-- arguments
@ -238,20 +245,16 @@ for i = 1, #functions do
local param = fn.parameters[j]
output:write('\n api_free_'..string.lower(real_type(param[1]))..'(arg_'..j..');')
end
if fn.return_type ~= 'void' then
output:write('\n return ret;\n}\n\n');
else
output:write('\n return NIL;\n}\n\n');
end
output:write('\n return ret;\n}\n\n');
end
-- Generate a function that initializes method names with handler functions
output:write([[
static Map(String, rpc_method_handler_fn) *methods = NULL;
static Map(String, MsgpackRpcRequestHandler) *methods = NULL;
void msgpack_rpc_init_method_table(void)
{
methods = map_new(String, rpc_method_handler_fn)();
methods = map_new(String, MsgpackRpcRequestHandler)();
]])
@ -260,10 +263,11 @@ void msgpack_rpc_init_method_table(void)
local max_fname_len = 0
for i = 1, #functions do
local fn = functions[i]
output:write(' map_put(String, rpc_method_handler_fn)(methods, '..
output:write(' map_put(String, MsgpackRpcRequestHandler)(methods, '..
'(String) {.data = "'..fn.name..'", '..
'.size = sizeof("'..fn.name..'") - 1}, handle_'..
fn.name..');\n')
'.size = sizeof("'..fn.name..'") - 1}, '..
'(MsgpackRpcRequestHandler) {.fn = handle_'.. fn.name..
', .defer = '..tostring(fn.deferred)..'});\n')
if #fn.name > max_fname_len then
max_fname_len = #fn.name
@ -273,26 +277,21 @@ end
output:write('\n}\n\n')
output:write([[
Object msgpack_rpc_dispatch(uint64_t channel_id,
msgpack_object *req,
Error *error)
MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name,
size_t name_len)
{
msgpack_object method = req->via.array.ptr[2];
rpc_method_handler_fn handler = NULL;
String m = {
.data=(char *)name,
.size=min(name_len, ]]..max_fname_len..[[)
};
MsgpackRpcRequestHandler rv =
map_get(String, MsgpackRpcRequestHandler)(methods, m);
if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
]])
output:write(' handler = map_get(String, rpc_method_handler_fn)')
output:write('(methods, (String){.data=(char *)method.via.bin.ptr,')
output:write('.size=min(method.via.bin.size, '..max_fname_len..')});\n')
output:write([[
if (!rv.fn) {
rv.fn = msgpack_rpc_handle_missing_method;
}
if (!handler) {
handler = msgpack_rpc_handle_missing_method;
}
return handler(channel_id, req, error);
return rv;
}
]])

View File

@ -179,6 +179,7 @@
#endif
#ifdef DEFINE_FUNC_ATTRIBUTES
#define FUNC_ATTR_DEFERRED
#define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC
#define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x)
#define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y)

View File

@ -108,4 +108,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER)
MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(String, rpc_method_handler_fn, DEFAULT_INITIALIZER)
#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false}
MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER)

View File

@ -25,7 +25,7 @@ MAP_DECLS(cstr_t, uint64_t)
MAP_DECLS(cstr_t, ptr_t)
MAP_DECLS(ptr_t, ptr_t)
MAP_DECLS(uint64_t, ptr_t)
MAP_DECLS(String, rpc_method_handler_fn)
MAP_DECLS(String, MsgpackRpcRequestHandler)
#define map_new(T, U) map_##T##_##U##_new
#define map_free(T, U) map_##T##_##U##_free

View File

@ -5,6 +5,8 @@
#include <uv.h>
#include <msgpack.h>
#include "nvim/lib/klist.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/msgpack_rpc/channel.h"
@ -52,6 +54,17 @@ typedef struct {
kvec_t(ChannelCallFrame *) call_stack;
} Channel;
typedef struct {
Channel *channel;
MsgpackRpcRequestHandler handler;
Array args;
uint64_t request_id;
} RequestEvent;
#define RequestEventFreer(x)
KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer)
kmempool_t(RequestEventPool) *request_event_pool = NULL;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
@ -64,6 +77,7 @@ static msgpack_sbuffer out_buffer;
/// Initializes the module
void channel_init(void)
{
request_event_pool = kmp_init(RequestEventPool);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@ -352,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
// Perform the call
WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
// write the response
if (!channel_write(channel, resp)) {
goto end;
}
handle_request(channel, &unpacked.data);
}
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
@ -387,6 +396,84 @@ end:
}
}
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
uint64_t request_id;
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
if (error.set) {
// Validation failed, send response with error
channel_write(channel,
serialize_response(request_id, &error, NIL, &out_buffer));
return;
}
// Retrieve the request handler
MsgpackRpcRequestHandler handler;
msgpack_object method = request->via.array.ptr[2];
if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
handler = msgpack_rpc_get_handler_for(method.via.bin.ptr,
method.via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
handler.defer = false;
}
Array args;
msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
if (kv_size(channel->call_stack) || !handler.defer) {
call_request_handler(channel, handler, args, request_id);
return;
}
// Defer calling the request handler.
RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
event_push((Event) {
.handler = on_request_event,
.data = event_data
});
}
static void on_request_event(Event event)
{
RequestEvent *e = event.data;
call_request_handler(e->channel, e->handler, e->args, e->request_id);
kmp_free(RequestEventPool, request_event_pool, e);
}
static void call_request_handler(Channel *channel,
MsgpackRpcRequestHandler handler,
Array args,
uint64_t request_id)
{
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, request_id, args, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
if (error.set) {
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
error.msg,
request_id);
channel_write(channel,
serialize_response(request_id, &error, NIL, &out_buffer));
}
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
request_id);
channel_write(channel,
serialize_response(request_id, &error, result, &out_buffer));
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;

View File

@ -6,9 +6,15 @@
/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
/// functions of this type.
typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
msgpack_object *req,
Error *error);
typedef struct {
Object (*fn)(uint64_t channel_id,
uint64_t request_id,
Array args,
Error *error);
bool defer; // Should the call be deferred to the main loop? This should
// be true if the function mutates editor data structures such
// as buffers, windows, tabs, or if it executes vimscript code.
} MsgpackRpcRequestHandler;
/// Initializes the msgpack-rpc method table
void msgpack_rpc_init_method_table(void);
@ -31,4 +37,7 @@ Object msgpack_rpc_dispatch(uint64_t channel_id,
Error *error)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name,
size_t name_len)
FUNC_ATTR_NONNULL_ARG(1);
#endif // NVIM_MSGPACK_RPC_DEFS_H

View File

@ -140,10 +140,13 @@ bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
case MSGPACK_OBJECT_EXT:
switch (obj->via.ext.type) {
case kObjectTypeBuffer:
arg->type = kObjectTypeBuffer;
return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
case kObjectTypeWindow:
arg->type = kObjectTypeWindow;
return msgpack_rpc_to_window(obj, &arg->data.window);
case kObjectTypeTabpage:
arg->type = kObjectTypeTabpage;
return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
}
default:
@ -292,44 +295,6 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
}
}
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
/// @param channel_id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_object *req,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2)
FUNC_ATTR_NONNULL_ARG(3)
{
uint64_t response_id;
Error error = ERROR_INIT;
msgpack_rpc_validate(&response_id, req, &error);
if (error.set) {
return serialize_response(response_id, &error, NIL, sbuffer);
}
// dispatch the call
Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
error.msg,
response_id);
return serialize_response(response_id, &error, NIL, sbuffer);
}
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
response_id);
return serialize_response(response_id, &error, rv, sbuffer);
}
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
@ -348,7 +313,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
/// Handler executed when an invalid method name is passed
Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
msgpack_object *req,
uint64_t request_id,
Array args,
Error *error)
{
snprintf(error->msg, sizeof(error->msg), "Invalid method name");