Merge pull request #872 'Add support for client-side RPC'

This commit is contained in:
Thiago de Arruda 2014-06-24 13:47:36 -03:00
commit 9f1b9726fb
16 changed files with 1135 additions and 679 deletions

View File

@ -91,6 +91,8 @@ output:write([[
#include <msgpack.h>
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
]])
for i = 1, #headers do
@ -120,20 +122,13 @@ output:write([[
};
const unsigned int msgpack_metadata_size = sizeof(msgpack_metadata);
void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res)
Object msgpack_rpc_dispatch(uint64_t channel_id,
uint64_t method_id,
msgpack_object *req,
Error *error)
{
Error error = { .set = false };
uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64;
Object ret = NIL;
switch (method_id) {
case 0:
msgpack_pack_nil(res);
// The result is the [channel_id, metadata] array
msgpack_pack_array(res, 2);
msgpack_pack_uint64(res, channel_id);
msgpack_pack_raw(res, sizeof(msgpack_metadata));
msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata));
return;
]])
-- Visit each function metadata to build the case label with code generated
@ -145,8 +140,7 @@ for i = 1, #api.functions do
output:write('\n case '..fn.id..': {')
output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {')
output:write('\n snprintf(error.msg, sizeof(error.msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n msgpack_rpc_error(error.msg, res);')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
-- Declare/initialize variables that will hold converted arguments
@ -164,7 +158,9 @@ for i = 1, #api.functions do
converted = 'arg_'..j
convert_arg = 'msgpack_rpc_to_'..string.lower(param[1])
output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {')
output:write('\n msgpack_rpc_error("Wrong type for argument '..j..', expecting '..param[1]..'", res);')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
args[#args + 1] = converted
@ -195,28 +191,20 @@ for i = 1, #api.functions do
if fn.can_fail then
-- if the function can fail, also pass a pointer to the local error object
if #args > 0 then
output:write(', &error);\n')
output:write(', error);\n')
else
output:write('&error);\n')
output:write('error);\n')
end
-- and check for the error
output:write('\n if (error.set) {')
output:write('\n msgpack_rpc_error(error.msg, res);')
output:write('\n if (error->set) {')
output:write('\n goto '..cleanup_label..';')
output:write('\n }\n')
else
output:write(');\n')
end
-- nil error
output:write('\n msgpack_pack_nil(res);');
if fn.return_type == 'void' then
output:write('\n msgpack_pack_nil(res);');
else
output:write('\n msgpack_rpc_from_'..string.lower(fn.return_type)..'(rv, res);')
-- free the return value
output:write('\n msgpack_rpc_free_'..string.lower(fn.return_type)..'(rv);')
if fn.return_type ~= 'void' then
output:write('\n ret = '..string.upper(fn.return_type)..'_OBJ(rv);')
end
-- Now generate the cleanup label for freeing memory allocated for the
-- arguments
@ -226,7 +214,7 @@ for i = 1, #api.functions do
local param = fn.parameters[j]
output:write('\n msgpack_rpc_free_'..string.lower(param[1])..'(arg_'..j..');')
end
output:write('\n return;');
output:write('\n break;');
output:write('\n };\n');
end
@ -235,8 +223,10 @@ output:write([[
default:
msgpack_rpc_error("Invalid function id", res);
snprintf(error->msg, sizeof(error->msg), "Invalid function id");
error->set = true;
}
return ret;
}
]])
output:close()

View File

@ -65,8 +65,16 @@ typedef enum {
kObjectTypeInteger,
kObjectTypeFloat,
kObjectTypeString,
kObjectTypeBuffer,
kObjectTypeWindow,
kObjectTypeTabpage,
kObjectTypeArray,
kObjectTypeDictionary
kObjectTypeDictionary,
kObjectTypePosition,
kObjectTypeStringArray,
kObjectTypeBufferArray,
kObjectTypeWindowArray,
kObjectTypeTabpageArray,
} ObjectType;
struct object {
@ -76,8 +84,16 @@ struct object {
Integer integer;
Float floating;
String string;
Buffer buffer;
Window window;
Tabpage tabpage;
Array array;
Dictionary dictionary;
Position position;
StringArray stringarray;
BufferArray bufferarray;
WindowArray windowarray;
TabpageArray tabpagearray;
} data;
};

View File

@ -341,7 +341,7 @@ String cstr_to_string(const char *str)
};
}
static bool object_to_vim(Object obj, typval_T *tv, Error *err)
bool object_to_vim(Object obj, typval_T *tv, Error *err)
{
tv->v_type = VAR_UNKNOWN;
tv->v_lock = 0;
@ -426,6 +426,8 @@ static bool object_to_vim(Object obj, typval_T *tv, Error *err)
}
tv->vval.v_dict->dv_refcount++;
break;
default:
abort();
}
return true;

View File

@ -14,7 +14,9 @@
err->set = true; \
} while (0)
#define BOOL_OBJ(b) ((Object) { \
#define OBJECT_OBJ(o) o
#define BOOLEAN_OBJ(b) ((Object) { \
.type = kObjectTypeBoolean, \
.data.boolean = b \
})
@ -26,26 +28,59 @@
#define STRING_OBJ(s) ((Object) { \
.type = kObjectTypeString, \
.data.string = cstr_to_string(s) \
.data.string = s \
})
#define STRINGL_OBJ(d, s) ((Object) { \
.type = kObjectTypeString, \
.data.string = (String) { \
.size = s, \
.data = xmemdup(d, s) \
}})
#define BUFFER_OBJ(s) ((Object) { \
.type = kObjectTypeBuffer, \
.data.buffer = s \
})
#define WINDOW_OBJ(s) ((Object) { \
.type = kObjectTypeWindow, \
.data.window = s \
})
#define TABPAGE_OBJ(s) ((Object) { \
.type = kObjectTypeTabpage, \
.data.tabpage = s \
})
#define ARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeArray, \
.data.array = a \
})
#define STRINGARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeStringArray, \
.data.stringarray = a \
})
#define BUFFERARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeBufferArray, \
.data.bufferarray = a \
})
#define WINDOWARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeWindowArray, \
.data.windowarray = a \
})
#define TABPAGEARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeTabpageArray, \
.data.tabpagearray = a \
})
#define DICTIONARY_OBJ(d) ((Object) { \
.type = kObjectTypeDictionary, \
.data.dictionary = d \
})
#define POSITION_OBJ(p) ((Object) { \
.type = kObjectTypePosition, \
.data.position = p \
})
#define NIL ((Object) {.type = kObjectTypeNil})
#define PUT(dict, k, v) \

View File

@ -424,8 +424,8 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err)
/// @param event The event type string
void vim_subscribe(uint64_t channel_id, String event)
{
size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
char e[EVENT_MAXLEN + 1];
size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN);
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_subscribe(channel_id, e);
@ -437,8 +437,10 @@ void vim_subscribe(uint64_t channel_id, String event)
/// @param event The event type string
void vim_unsubscribe(uint64_t channel_id, String event)
{
size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
char e[EVENT_MAXLEN + 1];
size_t length = (event.size < METHOD_MAXLEN ?
event.size :
METHOD_MAXLEN);
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_unsubscribe(channel_id, e);

View File

@ -71,6 +71,7 @@
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/api/private/helpers.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@ -6453,6 +6454,7 @@ static struct fst {
{"searchpair", 3, 7, f_searchpair},
{"searchpairpos", 3, 7, f_searchpairpos},
{"searchpos", 1, 4, f_searchpos},
{"send_call", 3, 3, f_send_call},
{"send_event", 3, 3, f_send_event},
{"setbufvar", 3, 3, f_setbufvar},
{"setcmdpos", 1, 1, f_setcmdpos},
@ -10474,6 +10476,7 @@ static void f_job_start(typval_T *argvars, typval_T *rettv)
on_job_stderr,
on_job_exit,
true,
0,
&rettv->vval.v_number);
if (rettv->vval.v_number <= 0) {
@ -10535,6 +10538,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
if (!job) {
// Invalid job id
EMSG(_(e_invjob));
return;
}
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
@ -12523,6 +12527,47 @@ do_searchpair (
return retval;
}
// "send_call()" function
static void f_send_call(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
EMSG2(_(e_invarg2), "Channel id must be a positive integer");
return;
}
if (argvars[1].v_type != VAR_STRING) {
EMSG2(_(e_invarg2), "Method name must be a string");
return;
}
bool errored;
Object result;
if (!channel_send_call((uint64_t)argvars[0].vval.v_number,
(char *)argvars[1].vval.v_string,
vim_to_object(&argvars[2]),
&result,
&errored)) {
EMSG2(_(e_invarg2), "Channel doesn't exist");
return;
}
Error conversion_error = {.set = false};
if (errored || !object_to_vim(result, rettv, &conversion_error)) {
EMSG(errored ?
result.data.string.data :
_("Error converting the call result"));
}
msgpack_rpc_free_object(result);
}
// "send_event()" function
static void f_send_event(typval_T *argvars, typval_T *rettv)
{

View File

@ -5,6 +5,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
@ -12,17 +13,24 @@
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/map.h"
#include "nvim/lib/kvec.h"
typedef struct {
uint64_t request_id;
bool errored;
Object result;
} ChannelCallFrame;
typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
bool is_job;
bool is_job, enabled;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
Job *job;
struct {
@ -31,12 +39,15 @@ typedef struct {
uv_stream_t *uv;
} streams;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
size_t rpc_call_level;
} Channel;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer msgpack_event_buffer;
static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.c.generated.h"
@ -47,7 +58,7 @@ void channel_init()
{
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&msgpack_event_buffer);
msgpack_sbuffer_init(&out_buffer);
}
/// Teardown the module
@ -80,6 +91,7 @@ bool channel_from_job(char **argv)
job_err,
job_exit,
true,
0,
&status);
if (status <= 0) {
@ -104,7 +116,7 @@ void channel_from_stream(uv_stream_t *stream)
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
channel->data.streams.write = wstream_new(0);
wstream_set_stream(channel->data.streams.write, stream);
channel->data.streams.uv = stream;
}
@ -113,26 +125,98 @@ void channel_from_stream(uv_stream_t *stream)
///
/// @param id The channel id. If 0, the event will be sent to all
/// channels that have subscribed to the event type
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @param name The event name, an arbitrary string
/// @param arg The event arg
/// @return True if the data was sent successfully, false otherwise.
bool channel_send_event(uint64_t id, char *type, Object data)
bool channel_send_event(uint64_t id, char *name, Object arg)
{
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
msgpack_rpc_free_object(data);
msgpack_rpc_free_object(arg);
return false;
}
send_event(channel, type, data);
send_event(channel, name, arg);
} else {
broadcast_event(type, data);
broadcast_event(name, arg);
}
return true;
}
bool channel_send_call(uint64_t id,
char *name,
Object arg,
Object *result,
bool *errored)
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
msgpack_rpc_free_object(arg);
return false;
}
if (kv_size(channel->call_stack) > 20) {
// 20 stack depth is more than anyone should ever need for RPC calls
*errored = true;
char buf[256];
snprintf(buf,
sizeof(buf),
"Channel %" PRIu64 " was closed due to a high stack depth "
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(cstr_to_string(buf));
}
uint64_t request_id = channel->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, name, arg);
if (!kv_size(channel->call_stack)) {
// This is the first frame, we must disable event deferral for this
// channel because we won't be returning until the client sends a
// response
if (channel->is_job) {
job_set_defer(channel->data.job, false);
} else {
rstream_set_defer(channel->data.streams.read, false);
}
}
// Push the frame
ChannelCallFrame frame = {request_id, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
size_t size = kv_size(channel->call_stack);
do {
event_poll(-1);
} while (
// Continue running if ...
channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return
if (!kv_size(channel->call_stack)) {
// Popped last frame, restore event deferral
if (channel->is_job) {
job_set_defer(channel->data.job, true);
} else {
rstream_set_defer(channel->data.streams.read, true);
}
if (!channel->enabled && !channel->rpc_call_level) {
// Close the channel if it has been disabled and we have not been called
// by `parse_msgpack`(It would be unsafe to close the channel otherwise)
close_channel(channel);
}
}
*errored = frame.errored;
*result = frame.result;
return true;
}
/// Subscribes to event broadcasts
///
/// @param id The channel id
@ -191,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
Channel *channel = data;
if (eof) {
close_channel(channel);
char buf[256];
snprintf(buf,
sizeof(buf),
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
disable_channel(channel, buf);
return;
}
channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
@ -205,23 +296,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
UnpackResult result;
msgpack_packer response;
// Deserialize everything we can.
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
== kUnpackResultOk) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free));
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
if (is_valid_rpc_response(&unpacked.data, channel)) {
call_stack_pop(&unpacked.data, channel);
} else {
char buf[256];
snprintf(buf,
sizeof(buf),
"Channel %" PRIu64 " returned a response that doesn't have "
" a matching id for the current RPC call. Ensure the client "
" is properly synchronized",
channel->id);
call_stack_unwind(channel, buf, 1);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
goto end;
}
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
// Perform the call
WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
// write the response
if (!channel_write(channel, resp)) {
goto end;
}
}
if (result == kUnpackResultFail) {
@ -231,50 +333,87 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when it's internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&response, 4);
msgpack_pack_int(&response, 1);
msgpack_pack_int(&response, 0);
msgpack_rpc_error("Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting",
&response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
}
end:
channel->rpc_call_level--;
if (!channel->enabled && !kv_size(channel->call_stack)) {
// Now it's safe to destroy the channel
close_channel(channel);
}
}
static void send_event(Channel *channel, char *type, Object data)
static bool channel_write(Channel *channel, WBuffer *buffer)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
bool success;
if (channel->is_job) {
success = job_write(channel->data.job, buffer);
} else {
success = wstream_write(channel->data.streams.write, buffer);
}
if (!success) {
// If the write failed for any reason, close the channel
char buf[256];
snprintf(buf,
sizeof(buf),
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed due to a failed write",
channel->id);
disable_channel(channel, buf);
}
return success;
}
static void broadcast_event(char *type, Object data)
static void send_error(Channel *channel, uint64_t id, char *err)
{
channel_write(channel, serialize_response(id, err, NIL, &out_buffer));
}
static void send_request(Channel *channel,
uint64_t id,
char *name,
Object arg)
{
String method = {.size = strlen(name), .data = name};
channel_write(channel, serialize_request(id, method, arg, &out_buffer));
}
static void send_event(Channel *channel,
char *name,
Object arg)
{
String method = {.size = strlen(name), .data = name};
channel_write(channel, serialize_request(0, method, arg, &out_buffer));
}
static void broadcast_event(char *name, Object arg)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
Channel *channel;
map_foreach_value(channels, channel, {
if (pmap_has(cstr_t)(channel->subscribed_events, type)) {
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
kv_push(Channel *, subscribed, channel);
}
});
if (!kv_size(subscribed)) {
msgpack_rpc_free_object(data);
msgpack_rpc_free_object(arg);
goto end;
}
WBuffer *buffer = serialize_event(type, data);
String method = {.size = strlen(name), .data = name};
WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
for (size_t i = 0; i < kv_size(subscribed); i++) {
wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
channel_write(kv_A(subscribed, i), buffer);
}
end:
@ -300,7 +439,6 @@ static void unsubscribe(Channel *channel, char *event)
static void close_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
msgpack_sbuffer_free(channel->sbuffer);
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
@ -320,6 +458,7 @@ static void close_channel(Channel *channel)
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
free(channel);
}
@ -329,29 +468,69 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
static WBuffer *serialize_event(char *type, Object data)
{
String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, data, &packer);
WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data,
msgpack_event_buffer.size),
msgpack_event_buffer.size,
free);
msgpack_rpc_free_object(data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return rv;
}
static Channel *register_channel()
{
Channel *rv = xmalloc(sizeof(Channel));
rv->enabled = true;
rv->rpc_call_level = 0;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
pmap_put(uint64_t)(channels, rv->id, rv);
return rv;
}
static bool is_rpc_response(msgpack_object *obj)
{
return obj->type == MSGPACK_OBJECT_ARRAY
&& obj->via.array.size == 4
&& obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
&& obj->via.array.ptr[0].via.u64 == 1
&& obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
}
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
{
uint64_t response_id = obj->via.array.ptr[1].via.u64;
// Must be equal to the frame at the stack's bottom
return response_id == kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1)->request_id;
}
static void call_stack_pop(msgpack_object *obj, Channel *channel)
{
ChannelCallFrame *frame = kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1);
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
(void)kv_pop(channel->call_stack);
if (frame->errored) {
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
} else {
msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
}
}
static void call_stack_unwind(Channel *channel, char *msg, int count)
{
while (kv_size(channel->call_stack) && count--) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
}
static void disable_channel(Channel *channel, char *msg)
{
if (kv_size(channel->call_stack)) {
// Channel is currently in the middle of a call, remove all frames and mark
// it as "dead"
channel->enabled = false;
call_stack_unwind(channel, msg, -1);
} else {
// Safe to close it now
close_channel(channel);
}
}

View File

@ -6,7 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
#define EVENT_MAXLEN 512
#define METHOD_MAXLEN 512
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.h.generated.h"

View File

@ -63,11 +63,6 @@ bool event_poll(int32_t ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
if (input_ready()) {
// If there's a pending input event to be consumed, do it now
return true;
}
static int recursive = 0;
if (!(recursive++)) {
@ -95,17 +90,16 @@ bool event_poll(int32_t ms)
run_mode = UV_RUN_NOWAIT;
}
bool events_processed;
do {
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
uv_run(uv_default_loop(), run_mode);
// Process immediate events outside uv_run since libuv event loop not
// support recursion(processing events may cause a recursive event_poll
// call)
event_process(false);
events_processed = event_process(false);
} while (
// Continue running if ...
!input_ready() && // we have no input
!events_processed && // we didn't process any immediate events
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timer_data.timed_out); // we didn't get a timeout
@ -124,7 +118,7 @@ bool event_poll(int32_t ms)
event_process(false);
}
return input_ready() || event_has_deferred();
return !timer_data.timed_out && (events_processed || event_has_deferred());
}
bool event_has_deferred()
@ -139,11 +133,13 @@ void event_push(Event event, bool deferred)
}
// Runs the appropriate action for each queued event
void event_process(bool deferred)
bool event_process(bool deferred)
{
bool processed_events = false;
Event event;
while (kl_shift(Event, get_queue(deferred), &event) == 0) {
processed_events = true;
switch (event.type) {
case kEventSignal:
signal_handle(event);
@ -158,6 +154,8 @@ void event_process(bool deferred)
abort();
}
}
return processed_events;
}
// Set a flag in the `event_poll` loop for signaling of a timeout

View File

@ -37,12 +37,6 @@ void input_init()
rstream_set_file(read_stream, read_cmd_fd);
}
// Check if there's pending input
bool input_ready()
{
return rstream_available(read_stream) > 0 || eof;
}
// Listen for input
void input_start()
{
@ -119,7 +113,7 @@ bool os_char_avail()
// In cooked mode we should get SIGINT, no need to check.
void os_breakcheck()
{
if (curr_tmode == TMODE_RAW && event_poll(0))
if (curr_tmode == TMODE_RAW && input_poll(0))
fill_input_buf(false);
}
@ -132,6 +126,11 @@ bool os_isatty(int fd)
return uv_guess_handle(fd) == UV_TTY;
}
static bool input_poll(int32_t ms)
{
return input_ready() || event_poll(ms) || input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
static InbufPollResult inbuf_poll(int32_t ms)
{
@ -139,7 +138,7 @@ static InbufPollResult inbuf_poll(int32_t ms)
return kInputAvail;
}
if (event_poll(ms)) {
if (input_poll(ms)) {
return eof && rstream_available(read_stream) == 0 ?
kInputEof :
kInputAvail;
@ -196,3 +195,10 @@ static int push_event_key(uint8_t *buf, int maxlen)
return buf_idx;
}
// Check if there's pending input
bool input_ready()
{
return rstream_available(read_stream) > 0 || eof;
}

View File

@ -21,7 +21,6 @@
#define EXIT_TIMEOUT 25
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 1024
#define JOB_WRITE_MAXMEM 1024 * 1024
struct job {
// Job id the index in the job table plus one.
@ -131,6 +130,7 @@ void job_teardown()
/// @param exit_cb Callback that will be invoked when the job exits
/// @param defer If the job callbacks invocation should be deferred to vim
/// main loop
/// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] The job id if the job started successfully, 0 if the job table
/// is full, -1 if the program could not be executed.
/// @return The job pointer if the job started successfully, NULL otherwise
@ -140,6 +140,7 @@ Job *job_start(char **argv,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb,
bool defer,
size_t maxmem,
int *status)
{
int i;
@ -210,7 +211,7 @@ Job *job_start(char **argv,
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
job->in = wstream_new(JOB_WRITE_MAXMEM);
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);

View File

@ -3,405 +3,73 @@
#include <msgpack.h>
#include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/os/wstream.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
#include "nvim/func_attr.h"
#define REMOTE_FUNCS_IMPL(t, lt) \
bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
{ \
*arg = obj->via.u64; \
return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \
} \
\
void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \
{ \
msgpack_pack_uint64(res, result); \
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc.c.generated.h"
#endif
#define TYPED_ARRAY_IMPL(t, lt) \
bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \
{ \
if (obj->type != MSGPACK_OBJECT_ARRAY) { \
return false; \
} \
\
arg->size = obj->via.array.size; \
arg->items = xcalloc(obj->via.array.size, sizeof(t)); \
\
for (size_t i = 0; i < obj->via.array.size; i++) { \
if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \
return false; \
} \
} \
\
return true; \
} \
\
void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \
{ \
msgpack_pack_array(res, result.size); \
\
for (size_t i = 0; i < result.size; i++) { \
msgpack_rpc_from_##lt(result.items[i], res); \
} \
} \
\
void msgpack_rpc_free_##lt##array(t##Array value) { \
for (size_t i = 0; i < value.size; i++) { \
msgpack_rpc_free_##lt(value.items[i]); \
} \
\
free(value.items); \
}
extern const uint8_t msgpack_metadata[];
extern const unsigned int msgpack_metadata_size;
void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
/// @param channel_id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_object *req,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2)
FUNC_ATTR_NONNULL_ARG(3)
{
// The initial response structure is the same no matter what happens,
// we set it up here
// Array of size 4
msgpack_pack_array(res, 4);
// Response type is 1
msgpack_pack_int(res, 1);
uint64_t response_id;
char *err = msgpack_rpc_validate(&response_id, req);
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Request is not an array", res);
return;
if (err) {
return serialize_response(response_id, err, NIL, sbuffer);
}
if (req->via.array.size != 4) {
msgpack_pack_int(res, 0); // no message id yet
char error_msg[256];
snprintf(error_msg,
sizeof(error_msg),
"Request array size is %u, it should be 4",
req->via.array.size);
msgpack_rpc_error(error_msg, res);
return;
uint64_t method_id = req->via.array.ptr[2].via.u64;
if (method_id == 0) {
return serialize_metadata(response_id, channel_id, sbuffer);
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Id must be a positive integer", res);
return;
// dispatch the call
Error error = { .set = false };
Object rv = msgpack_rpc_dispatch(channel_id, method_id, req, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
return serialize_response(response_id, error.msg, NIL, sbuffer);
}
// Set the response id, which is the same as the request
msgpack_pack_uint64(res, req->via.array.ptr[1].via.u64);
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_rpc_error("Message type must be an integer", res);
return;
}
if (req->via.array.ptr[0].via.u64 != 0) {
msgpack_rpc_error("Message type must be 0", res);
return;
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_rpc_error("Method id must be a positive integer", res);
return;
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
msgpack_rpc_error("Paremeters must be an array", res);
return;
}
// dispatch the message
msgpack_rpc_dispatch(id, req, res);
}
void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
{
msgpack_pack_array(pac, 3);
msgpack_pack_int(pac, 2);
msgpack_pack_raw(pac, type.size);
msgpack_pack_raw_body(pac, type.data, type.size);
msgpack_rpc_from_object(data, pac);
}
void msgpack_rpc_error(char *msg, msgpack_packer *res)
{
size_t len = strlen(msg);
// error message
msgpack_pack_raw(res, len);
msgpack_pack_raw_body(res, msg, len);
// Nil result
msgpack_pack_nil(res);
}
bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
{
*arg = obj->via.boolean;
return obj->type == MSGPACK_OBJECT_BOOLEAN;
}
bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
{
if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
&& obj->via.u64 <= INT64_MAX) {
*arg = (int64_t)obj->via.u64;
return true;
}
*arg = obj->via.i64;
return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
}
bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
{
*arg = obj->via.dec;
return obj->type == MSGPACK_OBJECT_DOUBLE;
}
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
{
if (obj->type != MSGPACK_OBJECT_RAW) {
return false;
}
arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size);
arg->size = obj->via.raw.size;
return true;
}
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
{
switch (obj->type) {
case MSGPACK_OBJECT_NIL:
arg->type = kObjectTypeNil;
return true;
case MSGPACK_OBJECT_BOOLEAN:
arg->type = kObjectTypeBoolean;
return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
case MSGPACK_OBJECT_POSITIVE_INTEGER:
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
arg->type = kObjectTypeInteger;
return msgpack_rpc_to_integer(obj, &arg->data.integer);
case MSGPACK_OBJECT_DOUBLE:
arg->type = kObjectTypeFloat;
return msgpack_rpc_to_float(obj, &arg->data.floating);
case MSGPACK_OBJECT_RAW:
arg->type = kObjectTypeString;
return msgpack_rpc_to_string(obj, &arg->data.string);
case MSGPACK_OBJECT_ARRAY:
arg->type = kObjectTypeArray;
return msgpack_rpc_to_array(obj, &arg->data.array);
case MSGPACK_OBJECT_MAP:
arg->type = kObjectTypeDictionary;
return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
default:
return false;
}
}
bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
{
return obj->type == MSGPACK_OBJECT_ARRAY
&& obj->via.array.size == 2
&& msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row)
&& msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col);
}
bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
{
if (obj->type != MSGPACK_OBJECT_ARRAY) {
return false;
}
arg->size = obj->via.array.size;
arg->items = xcalloc(obj->via.array.size, sizeof(Object));
for (uint32_t i = 0; i < obj->via.array.size; i++) {
if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
return false;
}
}
return true;
}
bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
{
if (obj->type != MSGPACK_OBJECT_MAP) {
return false;
}
arg->size = obj->via.array.size;
arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
for (uint32_t i = 0; i < obj->via.map.size; i++) {
if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
&arg->items[i].key)) {
return false;
}
if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
&arg->items[i].value)) {
return false;
}
}
return true;
}
void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
{
if (result) {
msgpack_pack_true(res);
} else {
msgpack_pack_false(res);
}
}
void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
{
msgpack_pack_int64(res, result);
}
void msgpack_rpc_from_float(Float result, msgpack_packer *res)
{
msgpack_pack_double(res, result);
}
void msgpack_rpc_from_string(String result, msgpack_packer *res)
{
msgpack_pack_raw(res, result.size);
msgpack_pack_raw_body(res, result.data, result.size);
}
void msgpack_rpc_from_object(Object result, msgpack_packer *res)
{
switch (result.type) {
case kObjectTypeNil:
msgpack_pack_nil(res);
break;
case kObjectTypeBoolean:
msgpack_rpc_from_boolean(result.data.boolean, res);
break;
case kObjectTypeInteger:
msgpack_rpc_from_integer(result.data.integer, res);
break;
case kObjectTypeFloat:
msgpack_rpc_from_float(result.data.floating, res);
break;
case kObjectTypeString:
msgpack_rpc_from_string(result.data.string, res);
break;
case kObjectTypeArray:
msgpack_rpc_from_array(result.data.array, res);
break;
case kObjectTypeDictionary:
msgpack_rpc_from_dictionary(result.data.dictionary, res);
break;
default:
abort();
}
}
void msgpack_rpc_from_position(Position result, msgpack_packer *res)
{
msgpack_pack_array(res, 2);;
msgpack_pack_int64(res, result.row);
msgpack_pack_int64(res, result.col);
}
void msgpack_rpc_from_array(Array result, msgpack_packer *res)
{
msgpack_pack_array(res, result.size);
for (size_t i = 0; i < result.size; i++) {
msgpack_rpc_from_object(result.items[i], res);
}
}
void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
{
msgpack_pack_map(res, result.size);
for (size_t i = 0; i < result.size; i++) {
msgpack_rpc_from_string(result.items[i].key, res);
msgpack_rpc_from_object(result.items[i].value, res);
}
}
void msgpack_rpc_free_string(String value)
{
if (!value.data) {
return;
}
free(value.data);
}
void msgpack_rpc_free_object(Object value)
{
switch (value.type) {
case kObjectTypeNil:
case kObjectTypeBoolean:
case kObjectTypeInteger:
case kObjectTypeFloat:
break;
case kObjectTypeString:
msgpack_rpc_free_string(value.data.string);
break;
case kObjectTypeArray:
msgpack_rpc_free_array(value.data.array);
break;
case kObjectTypeDictionary:
msgpack_rpc_free_dictionary(value.data.dictionary);
break;
default:
abort();
}
}
void msgpack_rpc_free_array(Array value)
{
for (uint32_t i = 0; i < value.size; i++) {
msgpack_rpc_free_object(value.items[i]);
}
free(value.items);
}
void msgpack_rpc_free_dictionary(Dictionary value)
{
for (uint32_t i = 0; i < value.size; i++) {
msgpack_rpc_free_string(value.items[i].key);
msgpack_rpc_free_object(value.items[i].value);
}
free(value.items);
return serialize_response(response_id, NULL, rv, sbuffer);
}
/// Try to unpack a msgpack document from the data in the unpacker buffer. This
/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
/// the called know if the unpacking failed due to bad input or due to missing
/// data.
///
/// @param unpacker The unpacker containing the parse buffer
/// @param result The result which will contain the parsed object
/// @return kUnpackResultOk : An object was parsed
/// kUnpackResultFail : Got bad input
/// kUnpackResultNeedMore: Need more data
UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
msgpack_unpacked* result)
FUNC_ATTR_NONNULL_ALL
{
if (result->zone != NULL) {
msgpack_zone_free(result->zone);
@ -425,12 +93,144 @@ UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
return kUnpackResultNeedMore;
}
REMOTE_FUNCS_IMPL(Buffer, buffer)
REMOTE_FUNCS_IMPL(Window, window)
REMOTE_FUNCS_IMPL(Tabpage, tabpage)
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
/// @param res A packer that contains the response
void msgpack_rpc_error(char *msg, msgpack_packer *res)
FUNC_ATTR_NONNULL_ALL
{
size_t len = strlen(msg);
TYPED_ARRAY_IMPL(Buffer, buffer)
TYPED_ARRAY_IMPL(Window, window)
TYPED_ARRAY_IMPL(Tabpage, tabpage)
TYPED_ARRAY_IMPL(String, string)
// error message
msgpack_pack_raw(res, len);
msgpack_pack_raw_body(res, msg, len);
// Nil result
msgpack_pack_nil(res);
}
/// Serializes a msgpack-rpc request or notification(id == 0)
WBuffer *serialize_request(uint64_t request_id,
String method,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, request_id ? 4 : 3);
msgpack_pack_int(&pac, request_id ? 0 : 2);
if (request_id) {
msgpack_pack_uint64(&pac, request_id);
}
msgpack_pack_raw(&pac, method.size);
msgpack_pack_raw_body(&pac, method.data, method.size);
msgpack_rpc_from_object(arg, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
/// Serializes a msgpack-rpc response
WBuffer *serialize_response(uint64_t response_id,
char *err_msg,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, response_id);
if (err_msg) {
String err = {.size = strlen(err_msg), .data = err_msg};
// error message
msgpack_pack_raw(&pac, err.size);
msgpack_pack_raw_body(&pac, err.data, err.size);
// Nil result
msgpack_pack_nil(&pac);
} else {
// Nil error
msgpack_pack_nil(&pac);
// Return value
msgpack_rpc_from_object(arg, &pac);
}
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
WBuffer *serialize_metadata(uint64_t id,
uint64_t channel_id,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ALL
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, id);
// Nil error
msgpack_pack_nil(&pac);
// The result is the [channel_id, metadata] array
msgpack_pack_array(&pac, 2);
msgpack_pack_uint64(&pac, channel_id);
msgpack_pack_raw(&pac, msgpack_metadata_size);
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
free);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
static char *msgpack_rpc_validate(uint64_t *response_id, msgpack_object *req)
{
// response id not known yet
*response_id = 0;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
return "Request is not an array";
}
if (req->via.array.size != 4) {
return "Request array size should be 4";
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Id must be a positive integer";
}
// Set the response id, which is the same as the request
*response_id = req->via.array.ptr[1].via.u64;
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Message type must be an integer";
}
if (req->via.array.ptr[0].via.u64 != 0) {
return "Message type must be 0";
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
return "Method id must be a positive integer";
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
return "Paremeters must be an array";
}
return NULL;
}

View File

@ -8,6 +8,7 @@
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/wstream.h"
typedef enum {
kUnpackResultOk, /// Successfully parsed a document
@ -15,167 +16,26 @@ typedef enum {
kUnpackResultNeedMore /// Need more data
} UnpackResult;
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
/// @param id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
/// Packs a notification message
///
/// @param type The message type, an arbitrary string
/// @param data The notification data
/// @param packer Where the notification will be packed to
void msgpack_rpc_notification(String type, Object data, msgpack_packer *pac)
FUNC_ATTR_NONNULL_ARG(3);
/// Dispatches to the actual API function after basic payload validation by
/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
/// to C types, and converting the return value back to msgpack types.
/// The implementation is generated at compile time with metadata extracted
/// from the api/*.h headers,
///
/// @param id The channel id
/// @param channel_id The channel id
/// @param method_id The method id
/// @param req The parsed request object
/// @param res A packer that contains the response
void msgpack_rpc_dispatch(uint64_t id,
msgpack_object *req,
msgpack_packer *res)
/// @param err Pointer to error structure
/// @return Some object
Object msgpack_rpc_dispatch(uint64_t channel_id,
uint64_t method_id,
msgpack_object *req,
Error *err)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
/// Try to unpack a msgpack document from the data in the unpacker buffer. This
/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
/// the called know if the unpacking failed due to bad input or due to missing
/// data.
///
/// @param unpacker The unpacker containing the parse buffer
/// @param result The result which will contain the parsed object
/// @return kUnpackResultOk : An object was parsed
/// kUnpackResultFail : Got bad input
/// kUnpackResultNeedMore: Need more data
UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
msgpack_unpacked* result);
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
/// @param res A packer that contains the response
void msgpack_rpc_error(char *msg, msgpack_packer *res)
FUNC_ATTR_NONNULL_ALL;
/// Functions for validating and converting from msgpack types to C types.
/// These are used by `msgpack_rpc_dispatch` to validate and convert each
/// argument.
///
/// @param obj The object to convert
/// @param[out] arg A pointer to the avalue
/// @return true if the conversion succeeded, false otherwise
bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
FUNC_ATTR_NONNULL_ALL;
/// Functions for converting from C types to msgpack types.
/// These are used by `msgpack_rpc_dispatch` to convert return values
/// from the API
///
/// @param result A pointer to the result
/// @param res A packer that contains the response
void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_float(Float result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_position(Position result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_string(String result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_window(Window result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_object(Object result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_array(Array result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
/// Helpers for initializing types that may be freed later
#define msgpack_rpc_init_boolean
#define msgpack_rpc_init_integer
#define msgpack_rpc_init_float
#define msgpack_rpc_init_position
#define msgpack_rpc_init_string = STRING_INIT
#define msgpack_rpc_init_buffer
#define msgpack_rpc_init_window
#define msgpack_rpc_init_tabpage
#define msgpack_rpc_init_object = {.type = kObjectTypeNil}
#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT
#define msgpack_rpc_init_array = ARRAY_DICT_INIT
#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT
/// Helpers for freeing arguments/return value
///
/// @param value The value to be freed
#define msgpack_rpc_free_boolean(value)
#define msgpack_rpc_free_integer(value)
#define msgpack_rpc_free_float(value)
#define msgpack_rpc_free_position(value)
void msgpack_rpc_free_string(String value);
#define msgpack_rpc_free_buffer(value)
#define msgpack_rpc_free_window(value)
#define msgpack_rpc_free_tabpage(value)
void msgpack_rpc_free_object(Object value);
void msgpack_rpc_free_stringarray(StringArray value);
void msgpack_rpc_free_bufferarray(BufferArray value);
void msgpack_rpc_free_windowarray(WindowArray value);
void msgpack_rpc_free_tabpagearray(TabpageArray value);
void msgpack_rpc_free_array(Array value);
void msgpack_rpc_free_dictionary(Dictionary value);
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc.h.generated.h"
#endif
#endif // NVIM_OS_MSGPACK_RPC_H

View File

@ -0,0 +1,380 @@
#include <stdint.h>
#include <stdbool.h>
#include <msgpack.h>
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#define REMOTE_FUNCS_IMPL(t, lt) \
bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
{ \
*arg = obj->via.u64; \
return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER; \
} \
\
void msgpack_rpc_from_##lt(t result, msgpack_packer *res) \
{ \
msgpack_pack_uint64(res, result); \
}
#define TYPED_ARRAY_IMPL(t, lt) \
bool msgpack_rpc_to_##lt##array(msgpack_object *obj, t##Array *arg) \
{ \
if (obj->type != MSGPACK_OBJECT_ARRAY) { \
return false; \
} \
\
arg->size = obj->via.array.size; \
arg->items = xcalloc(obj->via.array.size, sizeof(t)); \
\
for (size_t i = 0; i < obj->via.array.size; i++) { \
if (!msgpack_rpc_to_##lt(obj->via.array.ptr + i, &arg->items[i])) { \
return false; \
} \
} \
\
return true; \
} \
\
void msgpack_rpc_from_##lt##array(t##Array result, msgpack_packer *res) \
{ \
msgpack_pack_array(res, result.size); \
\
for (size_t i = 0; i < result.size; i++) { \
msgpack_rpc_from_##lt(result.items[i], res); \
} \
} \
\
void msgpack_rpc_free_##lt##array(t##Array value) { \
for (size_t i = 0; i < value.size; i++) { \
msgpack_rpc_free_##lt(value.items[i]); \
} \
\
free(value.items); \
}
bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
{
*arg = obj->via.boolean;
return obj->type == MSGPACK_OBJECT_BOOLEAN;
}
bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
{
if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
&& obj->via.u64 <= INT64_MAX) {
*arg = (int64_t)obj->via.u64;
return true;
}
*arg = obj->via.i64;
return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
}
bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
{
*arg = obj->via.dec;
return obj->type == MSGPACK_OBJECT_DOUBLE;
}
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
{
if (obj->type != MSGPACK_OBJECT_RAW) {
return false;
}
arg->data = xmemdupz(obj->via.raw.ptr, obj->via.raw.size);
arg->size = obj->via.raw.size;
return true;
}
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
{
switch (obj->type) {
case MSGPACK_OBJECT_NIL:
arg->type = kObjectTypeNil;
return true;
case MSGPACK_OBJECT_BOOLEAN:
arg->type = kObjectTypeBoolean;
return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
case MSGPACK_OBJECT_POSITIVE_INTEGER:
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
arg->type = kObjectTypeInteger;
return msgpack_rpc_to_integer(obj, &arg->data.integer);
case MSGPACK_OBJECT_DOUBLE:
arg->type = kObjectTypeFloat;
return msgpack_rpc_to_float(obj, &arg->data.floating);
case MSGPACK_OBJECT_RAW:
arg->type = kObjectTypeString;
return msgpack_rpc_to_string(obj, &arg->data.string);
case MSGPACK_OBJECT_ARRAY:
arg->type = kObjectTypeArray;
return msgpack_rpc_to_array(obj, &arg->data.array);
case MSGPACK_OBJECT_MAP:
arg->type = kObjectTypeDictionary;
return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
default:
return false;
}
}
bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
{
return obj->type == MSGPACK_OBJECT_ARRAY
&& obj->via.array.size == 2
&& msgpack_rpc_to_integer(obj->via.array.ptr, &arg->row)
&& msgpack_rpc_to_integer(obj->via.array.ptr + 1, &arg->col);
}
bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
{
if (obj->type != MSGPACK_OBJECT_ARRAY) {
return false;
}
arg->size = obj->via.array.size;
arg->items = xcalloc(obj->via.array.size, sizeof(Object));
for (uint32_t i = 0; i < obj->via.array.size; i++) {
if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
return false;
}
}
return true;
}
bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
{
if (obj->type != MSGPACK_OBJECT_MAP) {
return false;
}
arg->size = obj->via.array.size;
arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
for (uint32_t i = 0; i < obj->via.map.size; i++) {
if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
&arg->items[i].key)) {
return false;
}
if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
&arg->items[i].value)) {
return false;
}
}
return true;
}
void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
{
if (result) {
msgpack_pack_true(res);
} else {
msgpack_pack_false(res);
}
}
void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
{
msgpack_pack_int64(res, result);
}
void msgpack_rpc_from_float(Float result, msgpack_packer *res)
{
msgpack_pack_double(res, result);
}
void msgpack_rpc_from_string(String result, msgpack_packer *res)
{
msgpack_pack_raw(res, result.size);
msgpack_pack_raw_body(res, result.data, result.size);
}
void msgpack_rpc_from_object(Object result, msgpack_packer *res)
{
switch (result.type) {
case kObjectTypeNil:
msgpack_pack_nil(res);
break;
case kObjectTypeBoolean:
msgpack_rpc_from_boolean(result.data.boolean, res);
break;
case kObjectTypeInteger:
msgpack_rpc_from_integer(result.data.integer, res);
break;
case kObjectTypeFloat:
msgpack_rpc_from_float(result.data.floating, res);
break;
case kObjectTypeString:
msgpack_rpc_from_string(result.data.string, res);
break;
case kObjectTypeArray:
msgpack_rpc_from_array(result.data.array, res);
break;
case kObjectTypePosition:
msgpack_rpc_from_position(result.data.position, res);
break;
case kObjectTypeBuffer:
msgpack_rpc_from_buffer(result.data.buffer, res);
break;
case kObjectTypeWindow:
msgpack_rpc_from_window(result.data.window, res);
break;
case kObjectTypeTabpage:
msgpack_rpc_from_tabpage(result.data.tabpage, res);
break;
case kObjectTypeStringArray:
msgpack_rpc_from_stringarray(result.data.stringarray, res);
break;
case kObjectTypeBufferArray:
msgpack_rpc_from_bufferarray(result.data.bufferarray, res);
break;
case kObjectTypeWindowArray:
msgpack_rpc_from_windowarray(result.data.windowarray, res);
break;
case kObjectTypeTabpageArray:
msgpack_rpc_from_tabpagearray(result.data.tabpagearray, res);
break;
case kObjectTypeDictionary:
msgpack_rpc_from_dictionary(result.data.dictionary, res);
break;
}
}
void msgpack_rpc_from_position(Position result, msgpack_packer *res)
{
msgpack_pack_array(res, 2);;
msgpack_pack_int64(res, result.row);
msgpack_pack_int64(res, result.col);
}
void msgpack_rpc_from_array(Array result, msgpack_packer *res)
{
msgpack_pack_array(res, result.size);
for (size_t i = 0; i < result.size; i++) {
msgpack_rpc_from_object(result.items[i], res);
}
}
void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
{
msgpack_pack_map(res, result.size);
for (size_t i = 0; i < result.size; i++) {
msgpack_rpc_from_string(result.items[i].key, res);
msgpack_rpc_from_object(result.items[i].value, res);
}
}
void msgpack_rpc_free_string(String value)
{
if (!value.data) {
return;
}
free(value.data);
}
void msgpack_rpc_free_object(Object value)
{
switch (value.type) {
case kObjectTypeNil:
case kObjectTypeBoolean:
case kObjectTypeInteger:
case kObjectTypeFloat:
case kObjectTypePosition:
case kObjectTypeBuffer:
case kObjectTypeWindow:
case kObjectTypeTabpage:
break;
case kObjectTypeString:
msgpack_rpc_free_string(value.data.string);
break;
case kObjectTypeArray:
msgpack_rpc_free_array(value.data.array);
break;
case kObjectTypeStringArray:
msgpack_rpc_free_stringarray(value.data.stringarray);
break;
case kObjectTypeBufferArray:
msgpack_rpc_free_bufferarray(value.data.bufferarray);
break;
case kObjectTypeWindowArray:
msgpack_rpc_free_windowarray(value.data.windowarray);
break;
case kObjectTypeTabpageArray:
msgpack_rpc_free_tabpagearray(value.data.tabpagearray);
break;
case kObjectTypeDictionary:
msgpack_rpc_free_dictionary(value.data.dictionary);
break;
default:
abort();
}
}
void msgpack_rpc_free_array(Array value)
{
for (uint32_t i = 0; i < value.size; i++) {
msgpack_rpc_free_object(value.items[i]);
}
free(value.items);
}
void msgpack_rpc_free_dictionary(Dictionary value)
{
for (uint32_t i = 0; i < value.size; i++) {
msgpack_rpc_free_string(value.items[i].key);
msgpack_rpc_free_object(value.items[i].value);
}
free(value.items);
}
REMOTE_FUNCS_IMPL(Buffer, buffer)
REMOTE_FUNCS_IMPL(Window, window)
REMOTE_FUNCS_IMPL(Tabpage, tabpage)
TYPED_ARRAY_IMPL(Buffer, buffer)
TYPED_ARRAY_IMPL(Window, window)
TYPED_ARRAY_IMPL(Tabpage, tabpage)
TYPED_ARRAY_IMPL(String, string)

View File

@ -0,0 +1,124 @@
#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H
#define NVIM_OS_MSGPACK_RPC_HELPERS_H
#include <stdint.h>
#include <stdbool.h>
#include <msgpack.h>
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
/// Functions for validating and converting from msgpack types to C types.
/// These are used by `msgpack_rpc_dispatch` to validate and convert each
/// argument.
///
/// @param obj The object to convert
/// @param[out] arg A pointer to the avalue
/// @return true if the conversion succeeded, false otherwise
bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_position(msgpack_object *obj, Position *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_buffer(msgpack_object *obj, Buffer *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_window(msgpack_object *obj, Window *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_tabpage(msgpack_object *obj, Tabpage *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_stringarray(msgpack_object *obj, StringArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_bufferarray(msgpack_object *obj, BufferArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_windowarray(msgpack_object *obj, WindowArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_tabpagearray(msgpack_object *obj, TabpageArray *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
FUNC_ATTR_NONNULL_ALL;
bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
FUNC_ATTR_NONNULL_ALL;
/// Functions for converting from C types to msgpack types.
/// These are used by `msgpack_rpc_dispatch` to convert return values
/// from the API
///
/// @param result A pointer to the result
/// @param res A packer that contains the response
void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_float(Float result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_position(Position result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_string(String result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_buffer(Buffer result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_window(Window result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_tabpage(Tabpage result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_object(Object result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_stringarray(StringArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_bufferarray(BufferArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_windowarray(WindowArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_tabpagearray(TabpageArray result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_array(Array result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2);
/// Helpers for initializing types that may be freed later
#define msgpack_rpc_init_boolean
#define msgpack_rpc_init_integer
#define msgpack_rpc_init_float
#define msgpack_rpc_init_position
#define msgpack_rpc_init_string = STRING_INIT
#define msgpack_rpc_init_buffer
#define msgpack_rpc_init_window
#define msgpack_rpc_init_tabpage
#define msgpack_rpc_init_object = {.type = kObjectTypeNil}
#define msgpack_rpc_init_stringarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_bufferarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_windowarray = ARRAY_DICT_INIT
#define msgpack_rpc_init_tabpagearray = ARRAY_DICT_INIT
#define msgpack_rpc_init_array = ARRAY_DICT_INIT
#define msgpack_rpc_init_dictionary = ARRAY_DICT_INIT
/// Helpers for freeing arguments/return value
///
/// @param value The value to be freed
#define msgpack_rpc_free_boolean(value)
#define msgpack_rpc_free_integer(value)
#define msgpack_rpc_free_float(value)
#define msgpack_rpc_free_position(value)
void msgpack_rpc_free_string(String value);
#define msgpack_rpc_free_buffer(value)
#define msgpack_rpc_free_window(value)
#define msgpack_rpc_free_tabpage(value)
void msgpack_rpc_free_object(Object value);
void msgpack_rpc_free_stringarray(StringArray value);
void msgpack_rpc_free_bufferarray(BufferArray value);
void msgpack_rpc_free_windowarray(WindowArray value);
void msgpack_rpc_free_tabpagearray(TabpageArray value);
void msgpack_rpc_free_array(Array value);
void msgpack_rpc_free_dictionary(Dictionary value);
#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H

View File

@ -9,6 +9,8 @@
#include "nvim/vim.h"
#include "nvim/memory.h"
#define DEFAULT_MAXMEM 1024 * 1024 * 10
struct wstream {
uv_stream_t *stream;
// Memory currently used by pending buffers
@ -43,6 +45,10 @@ typedef struct {
/// @return The newly-allocated `WStream` instance
WStream * wstream_new(size_t maxmem)
{
if (!maxmem) {
maxmem = DEFAULT_MAXMEM;
}
WStream *rv = xmalloc(sizeof(WStream));
rv->maxmem = maxmem;
rv->stream = NULL;
@ -91,11 +97,12 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
// This should not be called after a wstream was freed
assert(!wstream->freed);
buffer->refcount++;
if (wstream->curmem > wstream->maxmem) {
return false;
goto err;
}
buffer->refcount++;
wstream->curmem += buffer->size;
data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
@ -105,9 +112,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
wstream->pending_reqs++;
uv_write(req, wstream->stream, &uvbuf, 1, write_cb);
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
goto err;
}
return true;
err:
release_wbuffer(buffer);
return false;
}
/// Creates a WBuffer object for holding output data. Instances of this
@ -138,10 +152,7 @@ static void write_cb(uv_write_t *req, int status)
free(req);
data->wstream->curmem -= data->buffer->size;
if (!--data->buffer->refcount) {
data->buffer->cb(data->buffer->data);
free(data->buffer);
}
release_wbuffer(data->buffer);
data->wstream->pending_reqs--;
if (data->wstream->freed && data->wstream->pending_reqs == 0) {
@ -152,3 +163,10 @@ static void write_cb(uv_write_t *req, int status)
free(data);
}
static void release_wbuffer(WBuffer *buffer)
{
if (!--buffer->refcount) {
buffer->cb(buffer->data);
free(buffer);
}
}