Merge pull request #9887 from justinmk/chan-notif-response-id

RPC: eliminate NO_RESPONSE
This commit is contained in:
Justin M. Keyes 2019-04-12 03:12:08 +02:00 committed by GitHub
commit e11a9d351a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 58 deletions

View File

@ -29,14 +29,13 @@ typedef enum {
} ErrorType; } ErrorType;
typedef enum { typedef enum {
kMessageTypeRequest, kMessageTypeUnknown = -1,
kMessageTypeResponse, // Per msgpack-rpc spec.
kMessageTypeNotification kMessageTypeRequest = 0,
kMessageTypeResponse = 1,
kMessageTypeNotification = 2,
} MessageType; } MessageType;
/// Used as the message ID of notifications.
#define NO_RESPONSE UINT64_MAX
/// Mask for all internal calls /// Mask for all internal calls
#define INTERNAL_CALL_MASK (((uint64_t)1) << (sizeof(uint64_t) * 8 - 1)) #define INTERNAL_CALL_MASK (((uint64_t)1) << (sizeof(uint64_t) * 8 - 1))

View File

@ -131,7 +131,7 @@ Object rpc_send_call(uint64_t id,
channel_incref(channel); channel_incref(channel);
RpcState *rpc = &channel->rpc; RpcState *rpc = &channel->rpc;
uint64_t request_id = rpc->next_request_id++; uint32_t request_id = rpc->next_request_id++;
// Send the msgpack-rpc request // Send the msgpack-rpc request
send_request(channel, request_id, method_name, args); send_request(channel, request_id, method_name, args);
@ -281,23 +281,26 @@ static void parse_msgpack(Channel *channel)
// A not so uncommon cause for this might be deserializing objects with // A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when its internal parse stack // a high nesting level: msgpack will break when its internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. " send_error(channel, kMessageTypeRequest, 0,
"This error can also happen when deserializing " "Invalid msgpack payload. "
"an object with high level of nesting"); "This error can also happen when deserializing "
"an object with high level of nesting");
} }
} }
/// Handles requests and notifications received on the channel.
static void handle_request(Channel *channel, msgpack_object *request) static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_ALL
{ {
uint64_t request_id; uint32_t request_id;
Error error = ERROR_INIT; Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error); MessageType type = msgpack_rpc_validate(&request_id, request, &error);
if (ERROR_SET(&error)) { if (ERROR_SET(&error)) {
// Validation failed, send response with error // Validation failed, send response with error
if (channel_write(channel, if (channel_write(channel,
serialize_response(channel->id, serialize_response(channel->id,
type,
request_id, request_id,
&error, &error,
NIL, NIL,
@ -311,6 +314,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
api_clear_error(&error); api_clear_error(&error);
return; return;
} }
assert(type == kMessageTypeRequest || type == kMessageTypeNotification);
MsgpackRpcRequestHandler handler; MsgpackRpcRequestHandler handler;
msgpack_object *method = msgpack_rpc_method(request); msgpack_object *method = msgpack_rpc_method(request);
@ -326,13 +330,14 @@ static void handle_request(Channel *channel, msgpack_object *request)
} }
if (ERROR_SET(&error)) { if (ERROR_SET(&error)) {
send_error(channel, request_id, error.msg); send_error(channel, type, request_id, error.msg);
api_clear_error(&error); api_clear_error(&error);
api_free_array(args); api_free_array(args);
return; return;
} }
RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
evdata->type = type;
evdata->channel = channel; evdata->channel = channel;
evdata->handler = handler; evdata->handler = handler;
evdata->args = args; evdata->args = args;
@ -343,39 +348,41 @@ static void handle_request(Channel *channel, msgpack_object *request)
if (is_get_mode && !input_blocking()) { if (is_get_mode && !input_blocking()) {
// Defer the event to a special queue used by os/input.c. #6247 // Defer the event to a special queue used by os/input.c. #6247
multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata); multiqueue_put(ch_before_blocking_events, response_event, 1, evdata);
} else { } else {
// Invoke immediately. // Invoke immediately.
on_request_event((void **)&evdata); response_event((void **)&evdata);
} }
} else { } else {
multiqueue_put(channel->events, on_request_event, 1, evdata); multiqueue_put(channel->events, response_event, 1, evdata);
DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr); DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr);
} }
} }
static void on_request_event(void **argv) /// Responds to a message, depending on the type:
/// - Request: writes the response.
/// - Notification: does nothing.
static void response_event(void **argv)
{ {
RequestEvent *e = argv[0]; RequestEvent *e = argv[0];
Channel *channel = e->channel; Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler; MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT; Error error = ERROR_INIT;
Object result = handler.fn(channel->id, args, &error); Object result = handler.fn(channel->id, e->args, &error);
if (request_id != NO_RESPONSE) { if (e->type == kMessageTypeRequest) {
// send the response // Send the response.
msgpack_packer response; msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
channel_write(channel, serialize_response(channel->id, channel_write(channel, serialize_response(channel->id,
request_id, e->type,
e->request_id,
&error, &error,
result, result,
&out_buffer)); &out_buffer));
} else { } else {
api_free_object(result); api_free_object(result);
} }
api_free_array(args); api_free_array(e->args);
channel_decref(channel); channel_decref(channel);
xfree(e); xfree(e);
api_clear_error(&error); api_clear_error(&error);
@ -430,20 +437,21 @@ static void internal_read_event(void **argv)
wstream_release_wbuffer(buffer); wstream_release_wbuffer(buffer);
} }
static void send_error(Channel *channel, uint64_t id, char *err) static void send_error(Channel *chan, MessageType type, uint32_t id, char *err)
{ {
Error e = ERROR_INIT; Error e = ERROR_INIT;
api_set_error(&e, kErrorTypeException, "%s", err); api_set_error(&e, kErrorTypeException, "%s", err);
channel_write(channel, serialize_response(channel->id, channel_write(chan, serialize_response(chan->id,
id, type,
&e, id,
NIL, &e,
&out_buffer)); NIL,
&out_buffer));
api_clear_error(&e); api_clear_error(&e);
} }
static void send_request(Channel *channel, static void send_request(Channel *channel,
uint64_t id, uint32_t id,
const char *name, const char *name,
Array args) Array args)
{ {
@ -576,7 +584,7 @@ static bool is_rpc_response(msgpack_object *obj)
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
{ {
uint64_t response_id = obj->via.array.ptr[1].via.u64; uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64;
if (kv_size(channel->rpc.call_stack) == 0) { if (kv_size(channel->rpc.call_stack) == 0) {
return false; return false;
} }
@ -614,7 +622,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
} }
static WBuffer *serialize_request(uint64_t channel_id, static WBuffer *serialize_request(uint64_t channel_id,
uint64_t request_id, uint32_t request_id,
const String method, const String method,
Array args, Array args,
msgpack_sbuffer *sbuffer, msgpack_sbuffer *sbuffer,
@ -634,14 +642,15 @@ static WBuffer *serialize_request(uint64_t channel_id,
} }
static WBuffer *serialize_response(uint64_t channel_id, static WBuffer *serialize_response(uint64_t channel_id,
uint64_t response_id, MessageType type,
uint32_t response_id,
Error *err, Error *err,
Object arg, Object arg,
msgpack_sbuffer *sbuffer) msgpack_sbuffer *sbuffer)
{ {
msgpack_packer pac; msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
if (ERROR_SET(err) && response_id == NO_RESPONSE) { if (ERROR_SET(err) && type == kMessageTypeNotification) {
Array args = ARRAY_DICT_INIT; Array args = ARRAY_DICT_INIT;
ADD(args, INTEGER_OBJ(err->type)); ADD(args, INTEGER_OBJ(err->type));
ADD(args, STRING_OBJ(cstr_to_string(err->msg))); ADD(args, STRING_OBJ(cstr_to_string(err->msg)));

View File

@ -13,23 +13,24 @@
typedef struct Channel Channel; typedef struct Channel Channel;
typedef struct { typedef struct {
uint64_t request_id; uint32_t request_id;
bool returned, errored; bool returned, errored;
Object result; Object result;
} ChannelCallFrame; } ChannelCallFrame;
typedef struct { typedef struct {
MessageType type;
Channel *channel; Channel *channel;
MsgpackRpcRequestHandler handler; MsgpackRpcRequestHandler handler;
Array args; Array args;
uint64_t request_id; uint32_t request_id;
} RequestEvent; } RequestEvent;
typedef struct { typedef struct {
PMap(cstr_t) *subscribed_events; PMap(cstr_t) *subscribed_events;
bool closed; bool closed;
msgpack_unpacker *unpacker; msgpack_unpacker *unpacker;
uint64_t next_request_id; uint32_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack; kvec_t(ChannelCallFrame *) call_stack;
Dictionary info; Dictionary info;
} RpcState; } RpcState;

View File

@ -489,7 +489,7 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
} }
/// Serializes a msgpack-rpc request or notification(id == 0) /// Serializes a msgpack-rpc request or notification(id == 0)
void msgpack_rpc_serialize_request(uint64_t request_id, void msgpack_rpc_serialize_request(uint32_t request_id,
const String method, const String method,
Array args, Array args,
msgpack_packer *pac) msgpack_packer *pac)
@ -499,7 +499,7 @@ void msgpack_rpc_serialize_request(uint64_t request_id,
msgpack_pack_int(pac, request_id ? 0 : 2); msgpack_pack_int(pac, request_id ? 0 : 2);
if (request_id) { if (request_id) {
msgpack_pack_uint64(pac, request_id); msgpack_pack_uint32(pac, request_id);
} }
msgpack_rpc_from_string(method, pac); msgpack_rpc_from_string(method, pac);
@ -507,7 +507,7 @@ void msgpack_rpc_serialize_request(uint64_t request_id,
} }
/// Serializes a msgpack-rpc response /// Serializes a msgpack-rpc response
void msgpack_rpc_serialize_response(uint64_t response_id, void msgpack_rpc_serialize_response(uint32_t response_id,
Error *err, Error *err,
Object arg, Object arg,
msgpack_packer *pac) msgpack_packer *pac)
@ -515,7 +515,7 @@ void msgpack_rpc_serialize_response(uint64_t response_id,
{ {
msgpack_pack_array(pac, 4); msgpack_pack_array(pac, 4);
msgpack_pack_int(pac, 1); msgpack_pack_int(pac, 1);
msgpack_pack_uint64(pac, response_id); msgpack_pack_uint32(pac, response_id);
if (ERROR_SET(err)) { if (ERROR_SET(err)) {
// error represented by a [type, message] array // error represented by a [type, message] array
@ -561,58 +561,57 @@ static msgpack_object *msgpack_rpc_msg_id(msgpack_object *req)
return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ? obj : NULL; return obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ? obj : NULL;
} }
void msgpack_rpc_validate(uint64_t *response_id, MessageType msgpack_rpc_validate(uint32_t *response_id, msgpack_object *req,
msgpack_object *req, Error *err)
Error *err)
{ {
// response id not known yet *response_id = 0;
*response_id = NO_RESPONSE;
// Validate the basic structure of the msgpack-rpc payload // Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) { if (req->type != MSGPACK_OBJECT_ARRAY) {
api_set_error(err, kErrorTypeValidation, "Message is not an array"); api_set_error(err, kErrorTypeValidation, "Message is not an array");
return; return kMessageTypeUnknown;
} }
if (req->via.array.size == 0) { if (req->via.array.size == 0) {
api_set_error(err, kErrorTypeValidation, "Message is empty"); api_set_error(err, kErrorTypeValidation, "Message is empty");
return; return kMessageTypeUnknown;
} }
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
api_set_error(err, kErrorTypeValidation, "Message type must be an integer"); api_set_error(err, kErrorTypeValidation, "Message type must be an integer");
return; return kMessageTypeUnknown;
} }
uint64_t type = req->via.array.ptr[0].via.u64; MessageType type = (MessageType)req->via.array.ptr[0].via.u64;
if (type != kMessageTypeRequest && type != kMessageTypeNotification) { if (type != kMessageTypeRequest && type != kMessageTypeNotification) {
api_set_error(err, kErrorTypeValidation, "Unknown message type"); api_set_error(err, kErrorTypeValidation, "Unknown message type");
return; return kMessageTypeUnknown;
} }
if ((type == kMessageTypeRequest && req->via.array.size != 4) if ((type == kMessageTypeRequest && req->via.array.size != 4)
|| (type == kMessageTypeNotification && req->via.array.size != 3)) { || (type == kMessageTypeNotification && req->via.array.size != 3)) {
api_set_error(err, kErrorTypeValidation, api_set_error(err, kErrorTypeValidation,
"Request array size must be 4 (request) or 3 (notification)"); "Request array size must be 4 (request) or 3 (notification)");
return; return type;
} }
if (type == kMessageTypeRequest) { if (type == kMessageTypeRequest) {
msgpack_object *id_obj = msgpack_rpc_msg_id(req); msgpack_object *id_obj = msgpack_rpc_msg_id(req);
if (!id_obj) { if (!id_obj) {
api_set_error(err, kErrorTypeValidation, "ID must be a positive integer"); api_set_error(err, kErrorTypeValidation, "ID must be a positive integer");
return; return type;
} }
*response_id = id_obj->via.u64; *response_id = (uint32_t)id_obj->via.u64;
} }
if (!msgpack_rpc_method(req)) { if (!msgpack_rpc_method(req)) {
api_set_error(err, kErrorTypeValidation, "Method must be a string"); api_set_error(err, kErrorTypeValidation, "Method must be a string");
return; return type;
} }
if (!msgpack_rpc_args(req)) { if (!msgpack_rpc_args(req)) {
api_set_error(err, kErrorTypeValidation, "Parameters must be an array"); api_set_error(err, kErrorTypeValidation, "Parameters must be an array");
return; return type;
} }
return type;
} }