channel/msgpack_rpc: Refactor API dispatching

This is how API dispatching worked before this commit:

- The generated `msgpack_rpc_dispatch` function receives a the `msgpack_packer`
  argument.
- The response is incrementally built while validating/calling the API.
- Return values/errors are also packed into the `msgpack_packer` while the
  final response is being calculated.

Now the `msgpack_packer` argument is no longer provided, and the
`msgpack_rpc_dispatch` function returns `Object`/`Error` values to
`msgpack_rpc_call`, which will use those values to build the response in a
single pass.

This was done because the new `channel_send_call` function created the
possibility of having recursive API invocations, and this wasn't possible when
sharing a single `msgpack_sbuffer` across call frames(it was shared implicitly
through the `msgpack_packer` instance).

Since we only start to build the response when the necessary information has
been computed, it's now safe to share a single `msgpack_sbuffer` instance
across all channels and API invocations.

Some other changes also had to be performed:

- Handling of the metadata discover was moved to `msgpack_rpc_call`
- Expose more types as subtypes of `Object`, this was required to forward the
  return value from `msgpack_rpc_dispatch` to `msgpack_rpc_call`
- Added more helper macros for casting API types to `Object`
  any
This commit is contained in:
Thiago de Arruda 2014-06-23 11:52:42 -03:00
parent bc0380038e
commit 296da85198
8 changed files with 251 additions and 129 deletions

View File

@ -92,6 +92,7 @@ output:write([[
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
]])
for i = 1, #headers do
@ -121,20 +122,13 @@ output:write([[
};
const unsigned int msgpack_metadata_size = sizeof(msgpack_metadata);
void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res)
Object msgpack_rpc_dispatch(uint64_t channel_id,
uint64_t method_id,
msgpack_object *req,
Error *error)
{
Error error = { .set = false };
uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64;
Object ret = NIL;
switch (method_id) {
case 0:
msgpack_pack_nil(res);
// The result is the [channel_id, metadata] array
msgpack_pack_array(res, 2);
msgpack_pack_uint64(res, channel_id);
msgpack_pack_raw(res, sizeof(msgpack_metadata));
msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata));
return;
]])
-- Visit each function metadata to build the case label with code generated
@ -146,8 +140,7 @@ for i = 1, #api.functions do
output:write('\n case '..fn.id..': {')
output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {')
output:write('\n snprintf(error.msg, sizeof(error.msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n msgpack_rpc_error(error.msg, res);')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
-- Declare/initialize variables that will hold converted arguments
@ -165,7 +158,9 @@ for i = 1, #api.functions do
converted = 'arg_'..j
convert_arg = 'msgpack_rpc_to_'..string.lower(param[1])
output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {')
output:write('\n msgpack_rpc_error("Wrong type for argument '..j..', expecting '..param[1]..'", res);')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
args[#args + 1] = converted
@ -196,28 +191,20 @@ for i = 1, #api.functions do
if fn.can_fail then
-- if the function can fail, also pass a pointer to the local error object
if #args > 0 then
output:write(', &error);\n')
output:write(', error);\n')
else
output:write('&error);\n')
output:write('error);\n')
end
-- and check for the error
output:write('\n if (error.set) {')
output:write('\n msgpack_rpc_error(error.msg, res);')
output:write('\n if (error->set) {')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
else
output:write(');\n')
end
-- nil error
output:write('\n msgpack_pack_nil(res);');
if fn.return_type == 'void' then
output:write('\n msgpack_pack_nil(res);');
else
output:write('\n msgpack_rpc_from_'..string.lower(fn.return_type)..'(rv, res);')
-- free the return value
output:write('\n msgpack_rpc_free_'..string.lower(fn.return_type)..'(rv);')
if fn.return_type ~= 'void' then
output:write('\n ret = '..string.upper(fn.return_type)..'_OBJ(rv);')
end
-- Now generate the cleanup label for freeing memory allocated for the
-- arguments
@ -227,7 +214,7 @@ for i = 1, #api.functions do
local param = fn.parameters[j]
output:write('\n msgpack_rpc_free_'..string.lower(param[1])..'(arg_'..j..');')
end
output:write('\n return;');
output:write('\n break;');
output:write('\n };\n');
end
@ -236,8 +223,10 @@ output:write([[
default:
msgpack_rpc_error("Invalid function id", res);
snprintf(error->msg, sizeof(error->msg), "Invalid function id");
error->set = true;
}
return ret;
}
]])
output:close()

View File

@ -65,8 +65,16 @@ typedef enum {
kObjectTypeInteger,
kObjectTypeFloat,
kObjectTypeString,
kObjectTypeBuffer,
kObjectTypeWindow,
kObjectTypeTabpage,
kObjectTypeArray,
kObjectTypeDictionary
kObjectTypeDictionary,
kObjectTypePosition,
kObjectTypeStringArray,
kObjectTypeBufferArray,
kObjectTypeWindowArray,
kObjectTypeTabpageArray,
} ObjectType;
struct object {
@ -76,8 +84,16 @@ struct object {
Integer integer;
Float floating;
String string;
Buffer buffer;
Window window;
Tabpage tabpage;
Array array;
Dictionary dictionary;
Position position;
StringArray stringarray;
BufferArray bufferarray;
WindowArray windowarray;
TabpageArray tabpagearray;
} data;
};

View File

@ -426,6 +426,8 @@ bool object_to_vim(Object obj, typval_T *tv, Error *err)
}
tv->vval.v_dict->dv_refcount++;
break;
default:
abort();
}
return true;

View File

@ -14,7 +14,9 @@
err->set = true; \
} while (0)
#define BOOL_OBJ(b) ((Object) { \
#define OBJECT_OBJ(o) o
#define BOOLEAN_OBJ(b) ((Object) { \
.type = kObjectTypeBoolean, \
.data.boolean = b \
})
@ -26,26 +28,59 @@
#define STRING_OBJ(s) ((Object) { \
.type = kObjectTypeString, \
.data.string = cstr_to_string(s) \
.data.string = s \
})
#define STRINGL_OBJ(d, s) ((Object) { \
.type = kObjectTypeString, \
.data.string = (String) { \
.size = s, \
.data = xmemdup(d, s) \
}})
#define BUFFER_OBJ(s) ((Object) { \
.type = kObjectTypeBuffer, \
.data.buffer = s \
})
#define WINDOW_OBJ(s) ((Object) { \
.type = kObjectTypeWindow, \
.data.window = s \
})
#define TABPAGE_OBJ(s) ((Object) { \
.type = kObjectTypeTabpage, \
.data.tabpage = s \
})
#define ARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeArray, \
.data.array = a \
})
#define STRINGARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeStringArray, \
.data.stringarray = a \
})
#define BUFFERARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeBufferArray, \
.data.bufferarray = a \
})
#define WINDOWARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeWindowArray, \
.data.windowarray = a \
})
#define TABPAGEARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeTabpageArray, \
.data.tabpagearray = a \
})
#define DICTIONARY_OBJ(d) ((Object) { \
.type = kObjectTypeDictionary, \
.data.dictionary = d \
})
#define POSITION_OBJ(p) ((Object) { \
.type = kObjectTypePosition, \
.data.position = p \
})
#define NIL ((Object) {.type = kObjectTypeNil})
#define PUT(dict, k, v) \

View File

@ -31,7 +31,6 @@ typedef struct {
PMap(cstr_t) *subscribed_events;
bool is_job, enabled;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
Job *job;
struct {
@ -168,7 +167,7 @@ bool channel_send_call(uint64_t id,
"Channel %" PRIu64 " was closed due to a high stack depth "
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(buf);
*result = STRING_OBJ(cstr_to_string(buf));
}
uint64_t request_id = channel->next_request_id++;
@ -319,20 +318,12 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
WBuffer *buffer = wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free);
if (!channel_write(channel, buffer)) {
WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
// write the response
if (!channel_write(channel, resp)) {
goto end;
}
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
if (result == kUnpackResultFail) {
@ -379,10 +370,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return success;
}
static void send_error(Channel *channel, uint64_t id, char *err_msg)
static void send_error(Channel *channel, uint64_t id, char *err)
{
String err = {.size = strlen(err_msg), .data = err_msg};
channel_write(channel, serialize_response(id, err, NIL, channel->sbuffer));
channel_write(channel, serialize_response(id, err, NIL, &out_buffer));
}
static void send_request(Channel *channel,
@ -449,7 +439,6 @@ static void unsubscribe(Channel *channel, char *event)
static void close_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
msgpack_sbuffer_free(channel->sbuffer);
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
@ -485,7 +474,6 @@ static Channel *register_channel()
rv->enabled = true;
rv->rpc_call_level = 0;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
@ -530,7 +518,7 @@ static void call_stack_unwind(Channel *channel, char *msg, int count)
while (kv_size(channel->call_stack) && count--) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(msg);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
}

View File

@ -8,77 +8,53 @@
#include "nvim/os/wstream.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
#include "nvim/func_attr.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc.c.generated.h"
#endif
extern const uint8_t msgpack_metadata[];
extern const unsigned int msgpack_metadata_size;
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
/// @param id The channel id
/// @param channel_id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3)
WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_object *req,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2)
FUNC_ATTR_NONNULL_ARG(3)
{
// The initial response structure is the same no matter what happens,
// we set it up here
// Array of size 4
msgpack_pack_array(res, 4);
// Response type is 1
msgpack_pack_int(res, 1);
uint64_t response_id;
char *err = msgpack_rpc_validate(&response_id, req);
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Request is not an array", res);
return;
if (err) {
return serialize_response(response_id, err, NIL, sbuffer);
}
if (req->via.array.size != 4) {
msgpack_pack_int(res, 0); // no message id yet
char error_msg[256];
snprintf(error_msg,
sizeof(error_msg),
"Request array size is %u, it should be 4",
req->via.array.size);
msgpack_rpc_error(error_msg, res);
return;
uint64_t method_id = req->via.array.ptr[2].via.u64;
if (method_id == 0) {
return serialize_metadata(response_id, channel_id, sbuffer);
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Id must be a positive integer", res);
return;
// dispatch the call
Error error = { .set = false };
Object rv = msgpack_rpc_dispatch(channel_id, method_id, req, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
return serialize_response(response_id, error.msg, NIL, sbuffer);
}
// Set the response id, which is the same as the request
msgpack_pack_uint64(res, req->via.array.ptr[1].via.u64);
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_rpc_error("Message type must be an integer", res);
return;
}
if (req->via.array.ptr[0].via.u64 != 0) {
msgpack_rpc_error("Message type must be 0", res);
return;
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_rpc_error("Method id must be a positive integer", res);
return;
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
msgpack_rpc_error("Paremeters must be an array", res);
return;
}
// dispatch the message
msgpack_rpc_dispatch(id, req, res);
return serialize_response(response_id, NULL, rv, sbuffer);
}
/// Try to unpack a msgpack document from the data in the unpacker buffer. This
@ -134,19 +110,19 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
}
/// Serializes a msgpack-rpc request or notification(id == 0)
WBuffer *serialize_request(uint64_t id,
WBuffer *serialize_request(uint64_t request_id,
String method,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ALL
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, id ? 4 : 3);
msgpack_pack_int(&pac, id ? 0 : 2);
msgpack_pack_array(&pac, request_id ? 4 : 3);
msgpack_pack_int(&pac, request_id ? 0 : 2);
if (id) {
msgpack_pack_uint64(&pac, id);
if (request_id) {
msgpack_pack_uint64(&pac, request_id);
}
msgpack_pack_raw(&pac, method.size);
@ -161,19 +137,20 @@ WBuffer *serialize_request(uint64_t id,
}
/// Serializes a msgpack-rpc response
WBuffer *serialize_response(uint64_t id,
String err,
WBuffer *serialize_response(uint64_t response_id,
char *err_msg,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ALL
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, id);
msgpack_pack_uint64(&pac, response_id);
if (err.size) {
if (err_msg) {
String err = {.size = strlen(err_msg), .data = err_msg};
// error message
msgpack_pack_raw(&pac, err.size);
msgpack_pack_raw_body(&pac, err.data, err.size);
@ -194,3 +171,66 @@ WBuffer *serialize_response(uint64_t id,
return rv;
}
WBuffer *serialize_metadata(uint64_t id,
uint64_t channel_id,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ALL
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, id);
// Nil error
msgpack_pack_nil(&pac);
// The result is the [channel_id, metadata] array
msgpack_pack_array(&pac, 2);
msgpack_pack_uint64(&pac, channel_id);
msgpack_pack_raw(&pac, msgpack_metadata_size);
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
free);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
static char *msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req)
{
// response id not known yet
*response_id = 0;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
return "Request is not an array";
}
if (req->via.array.size != 4) {
return "Request array size should be 4";
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Id must be a positive integer";
}
// Set the response id, which is the same as the request
*response_id = req->via.array.ptr[1].via.u64;
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Message type must be an integer";
}
if (req->via.array.ptr[0].via.u64 != 0) {
return "Message type must be 0";
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Method id must be a positive integer";
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
return "Paremeters must be an array";
}
return NULL;
}

View File

@ -22,12 +22,15 @@ typedef enum {
/// The implementation is generated at compile time with metadata extracted
/// from the api/*.h headers,
///
/// @param id The channel id
/// @param channel_id The channel id
/// @param method_id The method id
/// @param req The parsed request object
/// @param res A packer that contains the response
void msgpack_rpc_dispatch(uint64_t id,
msgpack_object *req,
msgpack_packer *res)
/// @param err Pointer to error structure
/// @return Some object
Object msgpack_rpc_dispatch(uint64_t channel_id,
uint64_t method_id,
msgpack_object *req,
Error *err)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
#ifdef INCLUDE_GENERATED_DECLARATIONS

View File

@ -231,12 +231,41 @@ void msgpack_rpc_from_object(Object result, msgpack_packer *res)
msgpack_rpc_from_array(result.data.array, res);
break;
case kObjectTypePosition:
msgpack_rpc_from_position(result.data.position, res);
break;
case kObjectTypeBuffer:
msgpack_rpc_from_buffer(result.data.buffer, res);
break;
case kObjectTypeWindow:
msgpack_rpc_from_window(result.data.window, res);
break;
case kObjectTypeTabpage:
msgpack_rpc_from_tabpage(result.data.tabpage, res);
break;
case kObjectTypeStringArray:
msgpack_rpc_from_stringarray(result.data.stringarray, res);
break;
case kObjectTypeBufferArray:
msgpack_rpc_from_bufferarray(result.data.bufferarray, res);
break;
case kObjectTypeWindowArray:
msgpack_rpc_from_windowarray(result.data.windowarray, res);
break;
case kObjectTypeTabpageArray:
msgpack_rpc_from_tabpagearray(result.data.tabpagearray, res);
break;
case kObjectTypeDictionary:
msgpack_rpc_from_dictionary(result.data.dictionary, res);
break;
default:
abort();
}
}
@ -282,6 +311,10 @@ void msgpack_rpc_free_object(Object value)
case kObjectTypeBoolean:
case kObjectTypeInteger:
case kObjectTypeFloat:
case kObjectTypePosition:
case kObjectTypeBuffer:
case kObjectTypeWindow:
case kObjectTypeTabpage:
break;
case kObjectTypeString:
@ -292,6 +325,22 @@ void msgpack_rpc_free_object(Object value)
msgpack_rpc_free_array(value.data.array);
break;
case kObjectTypeStringArray:
msgpack_rpc_free_stringarray(value.data.stringarray);
break;
case kObjectTypeBufferArray:
msgpack_rpc_free_bufferarray(value.data.bufferarray);
break;
case kObjectTypeWindowArray:
msgpack_rpc_free_windowarray(value.data.windowarray);
break;
case kObjectTypeTabpageArray:
msgpack_rpc_free_tabpagearray(value.data.tabpagearray);
break;
case kObjectTypeDictionary:
msgpack_rpc_free_dictionary(value.data.dictionary);
break;