Merge PR #1316 'Refactor event deferral'

This commit is contained in:
Thiago de Arruda 2014-10-22 07:30:32 -03:00
commit 5cd9b64742
40 changed files with 680 additions and 815 deletions

View File

@ -2,8 +2,6 @@
set_environment /opt/neovim-deps/64
install_functional_test_deps
sudo pip install cpp-coveralls
clang_version=3.4

View File

@ -47,12 +47,6 @@ install_prebuilt_deps() {
fi
}
install_functional_test_deps() {
sudo pip install git+https://github.com/neovim/python-client.git
# Pass -E to let pip use PKG_CONFIG_PATH for luajit
sudo -E pip install lupa
}
tmpdir="$(pwd)/tmp"
rm -rf "$tmpdir"
mkdir -p "$tmpdir"

View File

@ -1,10 +1,5 @@
. "$CI_SCRIPTS/common.sh"
# To install lupa, a temporarary functional test dependency, we require the
# 64-bit luajit since travis version of python is 64-bit.
export PKG_CONFIG_PATH="/opt/neovim-deps/64/usr/lib/pkgconfig"
install_functional_test_deps
set_environment /opt/neovim-deps/32
# Need this to keep apt-get from removing gcc when installing libncurses

View File

@ -2,8 +2,6 @@
set_environment /opt/neovim-deps/64
install_functional_test_deps
sudo pip install cpp-coveralls
sudo apt-get install valgrind

View File

@ -32,8 +32,6 @@ src/nvim/os/job.c
src/nvim/os/job.h
src/nvim/os/job_defs.h
src/nvim/os/mem.c
src/nvim/os/msgpack_rpc.c
src/nvim/os/msgpack_rpc.h
src/nvim/os/os.h
src/nvim/os/rstream.c
src/nvim/os/rstream.h
@ -44,10 +42,12 @@ src/nvim/os/signal.c
src/nvim/os/signal.h
src/nvim/os/time.c
src/nvim/os/time.h
src/nvim/os/server.c
src/nvim/os/server.h
src/nvim/os/channel.c
src/nvim/os/channel.h
src/nvim/msgpack_rpc/server.c
src/nvim/msgpack_rpc/server.h
src/nvim/msgpack_rpc/channel.c
src/nvim/msgpack_rpc/channel.h
src/nvim/msgpack_rpc/helpers.c
src/nvim/msgpack_rpc/helpers.h
src/nvim/tempfile.c
src/nvim/tempfile.h
src/nvim/profile.c

View File

@ -34,6 +34,8 @@ c_params = Ct(c_void + c_param_list)
c_proto = Ct(
Cg(c_type, 'return_type') * Cg(c_id, 'name') *
fill * P('(') * fill * Cg(c_params, 'parameters') * fill * P(')') *
Cg(Cc(false), 'deferred') *
(fill * Cg((P('FUNC_ATTR_DEFERRED') * Cc(true)), 'deferred') ^ -1) *
fill * P(';')
)
grammar = Ct((c_proto + c_comment + c_preproc + ws) ^ 1)
@ -92,8 +94,8 @@ output:write([[
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/vim.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
]])
@ -159,9 +161,10 @@ for i = 1, #functions do
local fn = functions[i]
local args = {}
output:write('static Object handle_'..fn.name..'(uint64_t channel_id, msgpack_object *req, Error *error)')
output:write('static Object handle_'..fn.name..'(uint64_t channel_id, uint64_t request_id, Array args, Error *error)')
output:write('\n{')
output:write('\n DLOG("Received msgpack-rpc call to '..fn.name..'(request id: %" PRIu64 ")", req->via.array.ptr[1].via.u64);')
output:write('\n DLOG("Handling msgpack-rpc call to '..fn.name..'(request id: %" PRIu64 ")", request_id);')
output:write('\n Object ret = NIL;')
-- Declare/initialize variables that will hold converted arguments
for j = 1, #fn.parameters do
local param = fn.parameters[j]
@ -169,8 +172,8 @@ for i = 1, #functions do
output:write('\n '..param[1]..' '..converted..' api_init_'..string.lower(real_type(param[1]))..';')
end
output:write('\n')
output:write('\n if (req->via.array.ptr[3].via.array.size != '..#fn.parameters..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %u", req->via.array.ptr[3].via.array.size);')
output:write('\n if (args.size != '..#fn.parameters..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong number of arguments: expecting '..#fn.parameters..' but got %zu", args.size);')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }\n')
@ -179,14 +182,18 @@ for i = 1, #functions do
for j = 1, #fn.parameters do
local converted, convert_arg, param, arg
param = fn.parameters[j]
arg = '(req->via.array.ptr[3].via.array.ptr + '..(j - 1)..')'
converted = 'arg_'..j
convert_arg = 'msgpack_rpc_to_'..real_type(param[1]):lower()
output:write('\n if (!'..convert_arg..'('..arg..', &'..converted..')) {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }\n')
if real_type(param[1]) ~= 'Object' then
output:write('\n if (args.items['..(j - 1)..'].type != kObjectType'..real_type(param[1])..') {')
output:write('\n snprintf(error->msg, sizeof(error->msg), "Wrong type for argument '..j..', expecting '..param[1]..'");')
output:write('\n error->set = true;')
output:write('\n goto cleanup;')
output:write('\n }')
output:write('\n '..converted..' = args.items['..(j - 1)..'].data.'..real_type(param[1]):lower()..';\n')
else
output:write('\n '..converted..' = args.items['..(j - 1)..'];\n')
end
args[#args + 1] = converted
end
@ -228,7 +235,7 @@ for i = 1, #functions do
end
if fn.return_type ~= 'void' then
output:write('\n Object ret = '..string.upper(real_type(fn.return_type))..'_OBJ(rv);')
output:write('\n ret = '..string.upper(real_type(fn.return_type))..'_OBJ(rv);')
end
-- Now generate the cleanup label for freeing memory allocated for the
-- arguments
@ -238,20 +245,16 @@ for i = 1, #functions do
local param = fn.parameters[j]
output:write('\n api_free_'..string.lower(real_type(param[1]))..'(arg_'..j..');')
end
if fn.return_type ~= 'void' then
output:write('\n return ret;\n}\n\n');
else
output:write('\n return NIL;\n}\n\n');
end
output:write('\n return ret;\n}\n\n');
end
-- Generate a function that initializes method names with handler functions
output:write([[
static Map(String, rpc_method_handler_fn) *methods = NULL;
static Map(String, MsgpackRpcRequestHandler) *methods = NULL;
void msgpack_rpc_init(void)
void msgpack_rpc_init_method_table(void)
{
methods = map_new(String, rpc_method_handler_fn)();
methods = map_new(String, MsgpackRpcRequestHandler)();
]])
@ -260,10 +263,11 @@ void msgpack_rpc_init(void)
local max_fname_len = 0
for i = 1, #functions do
local fn = functions[i]
output:write(' map_put(String, rpc_method_handler_fn)(methods, '..
output:write(' map_put(String, MsgpackRpcRequestHandler)(methods, '..
'(String) {.data = "'..fn.name..'", '..
'.size = sizeof("'..fn.name..'") - 1}, handle_'..
fn.name..');\n')
'.size = sizeof("'..fn.name..'") - 1}, '..
'(MsgpackRpcRequestHandler) {.fn = handle_'.. fn.name..
', .defer = '..tostring(fn.deferred)..'});\n')
if #fn.name > max_fname_len then
max_fname_len = #fn.name
@ -273,26 +277,21 @@ end
output:write('\n}\n\n')
output:write([[
Object msgpack_rpc_dispatch(uint64_t channel_id,
msgpack_object *req,
Error *error)
MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name,
size_t name_len)
{
msgpack_object method = req->via.array.ptr[2];
rpc_method_handler_fn handler = NULL;
String m = {
.data=(char *)name,
.size=min(name_len, ]]..max_fname_len..[[)
};
MsgpackRpcRequestHandler rv =
map_get(String, MsgpackRpcRequestHandler)(methods, m);
if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
]])
output:write(' handler = map_get(String, rpc_method_handler_fn)')
output:write('(methods, (String){.data=(char *)method.via.bin.ptr,')
output:write('.size=min(method.via.bin.size, '..max_fname_len..')});\n')
output:write([[
if (!rv.fn) {
rv.fn = msgpack_rpc_handle_missing_method;
}
if (!handler) {
handler = msgpack_rpc_handle_missing_method;
}
return handler(channel_id, req, error);
return rv;
}
]])

View File

@ -3,7 +3,7 @@ include(CheckLibraryExists)
set(GENERATED_DIR ${PROJECT_BINARY_DIR}/src/nvim/auto)
set(DISPATCH_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/msgpack-gen.lua)
file(GLOB API_HEADERS api/*.h)
set(MSGPACK_RPC_HEADER ${PROJECT_SOURCE_DIR}/src/nvim/os/msgpack_rpc.h)
file(GLOB MSGPACK_RPC_HEADERS msgpack_rpc/*.h)
set(MSGPACK_DISPATCH ${GENERATED_DIR}/msgpack_dispatch.c)
set(HEADER_GENERATOR ${PROJECT_SOURCE_DIR}/scripts/gendeclarations.lua)
set(GENERATED_INCLUDES_DIR ${PROJECT_BINARY_DIR}/include)
@ -19,12 +19,14 @@ file(MAKE_DIRECTORY ${GENERATED_DIR})
file(MAKE_DIRECTORY ${GENERATED_DIR}/os)
file(MAKE_DIRECTORY ${GENERATED_DIR}/api)
file(MAKE_DIRECTORY ${GENERATED_DIR}/api/private)
file(MAKE_DIRECTORY ${GENERATED_DIR}/msgpack_rpc)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR})
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/os)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/api/private)
file(MAKE_DIRECTORY ${GENERATED_INCLUDES_DIR}/msgpack_rpc)
file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c)
file(GLOB NEOVIM_SOURCES *.c os/*.c api/*.c api/private/*.c msgpack_rpc/*.c)
file(GLOB_RECURSE NEOVIM_HEADERS *.h)
foreach(sfile ${NEOVIM_SOURCES})
@ -36,8 +38,7 @@ endforeach()
list(REMOVE_ITEM NEOVIM_SOURCES ${to_remove})
set(CONV_SRCS
api.c
set(CONV_SOURCES
arabic.c
cursor.c
garray.c
@ -46,31 +47,24 @@ set(CONV_SRCS
map.c
memory.c
misc2.c
map.c
profile.c
os/env.c
os/event.c
os/job.c
os/mem.c
os/rstream.c
os/signal.c
os/users.c
os/provider.c
os/uv_helpers.c
os/wstream.c
os/msgpack_rpc.c
tempfile.c
api/buffer.c
api/private/helpers.c
api/private/handle.c
api/tabpage.c
api/window.c
api/vim.h
api/vim.c
)
foreach(sfile ${CONV_SOURCES})
if(NOT EXISTS "${PROJECT_SOURCE_DIR}/src/nvim/${sfile}")
message(FATAL_ERROR "${sfile} doesn't exist(it was added to CONV_SOURCES)")
endif()
endforeach()
file(GLOB_RECURSE EXTRA_CONV_SOURCES os/*.c api/*.c msgpack_rpc/*.c)
foreach(sfile ${EXTRA_CONV_SOURCES})
file(RELATIVE_PATH f "${PROJECT_SOURCE_DIR}/src/nvim" "${sfile}")
list(APPEND CONV_SOURCES ${f})
endforeach()
set_source_files_properties(
${CONV_SRCS} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion")
${CONV_SOURCES} PROPERTIES COMPILE_FLAGS "${COMPILE_FLAGS} -Wconversion")
if(CMAKE_C_COMPILER_ID MATCHES "Clang")
if(DEFINED ENV{SANITIZE})
@ -126,7 +120,7 @@ add_custom_command(OUTPUT ${MSGPACK_DISPATCH}
COMMAND ${LUA_PRG} ${DISPATCH_GENERATOR} ${API_HEADERS} ${MSGPACK_DISPATCH}
DEPENDS
${API_HEADERS}
${MSGPACK_RPC_HEADER}
${MSGPACK_RPC_HEADERS}
${DISPATCH_GENERATOR}
)

View File

@ -69,6 +69,7 @@ String buffer_get_line(Buffer buffer, Integer index, Error *err)
/// @param line The new line.
/// @param[out] err Details of an error that may have occurred
void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
FUNC_ATTR_DEFERRED
{
Object l = STRING_OBJ(line);
Array array = {.items = &l, .size = 1};
@ -81,6 +82,7 @@ void buffer_set_line(Buffer buffer, Integer index, String line, Error *err)
/// @param index The line index
/// @param[out] err Details of an error that may have occurred
void buffer_del_line(Buffer buffer, Integer index, Error *err)
FUNC_ATTR_DEFERRED
{
Array array = ARRAY_DICT_INIT;
buffer_set_line_slice(buffer, index, index, true, true, array, err);
@ -163,6 +165,7 @@ void buffer_set_line_slice(Buffer buffer,
Boolean include_end,
ArrayOf(String) replacement,
Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -314,6 +317,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -349,6 +353,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -399,6 +404,7 @@ String buffer_get_name(Buffer buffer, Error *err)
/// @param name The buffer name
/// @param[out] err Details of an error that may have occurred
void buffer_set_name(Buffer buffer, String name, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -444,6 +450,7 @@ void buffer_insert(Buffer buffer,
Integer lnum,
ArrayOf(String) lines,
Error *err)
FUNC_ATTR_DEFERRED
{
buffer_set_line_slice(buffer, lnum, lnum, false, true, lines, err);
}

View File

@ -62,6 +62,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The tab page handle
Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
tabpage_T *tab = find_tab_by_handle(tabpage, err);

View File

@ -10,7 +10,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/api/buffer.h"
#include "nvim/os/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/provider.h"
#include "nvim/vim.h"
#include "nvim/buffer.h"
@ -31,19 +31,12 @@
# include "api/vim.c.generated.h"
#endif
/// Send keys to vim input buffer, simulating user input.
///
/// @param str The keys to send
void vim_push_keys(String str)
{
abort();
}
/// Executes an ex-mode command str
///
/// @param str The command str
/// @param[out] err Details of an error that may have occurred
void vim_command(String str, Error *err)
FUNC_ATTR_DEFERRED
{
// Run the command
try_start();
@ -111,6 +104,7 @@ String vim_replace_termcodes(String str, Boolean from_part, Boolean do_lt,
/// @param[out] err Details of an error that may have occurred
/// @return The expanded object
Object vim_eval(String str, Error *err)
FUNC_ATTR_DEFERRED
{
Object rv;
// Evaluate the expression
@ -230,6 +224,7 @@ String vim_get_current_line(Error *err)
/// @param line The line contents
/// @param[out] err Details of an error that may have occurred
void vim_set_current_line(String line, Error *err)
FUNC_ATTR_DEFERRED
{
buffer_set_line(curbuf->handle, curwin->w_cursor.lnum - 1, line, err);
}
@ -238,6 +233,7 @@ void vim_set_current_line(String line, Error *err)
///
/// @param[out] err Details of an error that may have occurred
void vim_del_current_line(Error *err)
FUNC_ATTR_DEFERRED
{
buffer_del_line(curbuf->handle, curwin->w_cursor.lnum - 1, err);
}
@ -259,6 +255,7 @@ Object vim_get_var(String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return the old value if any
Object vim_set_var(String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
return dict_set_value(&globvardict, name, value, err);
}
@ -289,6 +286,7 @@ Object vim_get_option(String name, Error *err)
/// @param value The new option value
/// @param[out] err Details of an error that may have occurred
void vim_set_option(String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
set_option_to(NULL, SREQ_GLOBAL, name, value, err);
}
@ -297,6 +295,7 @@ void vim_set_option(String name, Object value, Error *err)
///
/// @param str The message
void vim_out_write(String str)
FUNC_ATTR_DEFERRED
{
write_msg(str, false);
}
@ -305,6 +304,7 @@ void vim_out_write(String str)
///
/// @param str The message
void vim_err_write(String str)
FUNC_ATTR_DEFERRED
{
write_msg(str, true);
}
@ -314,6 +314,7 @@ void vim_err_write(String str)
///
/// @param str The message
void vim_report_error(String str)
FUNC_ATTR_DEFERRED
{
vim_err_write(str);
vim_err_write((String) {.data = "\n", .size = 1});
@ -357,6 +358,7 @@ Buffer vim_get_current_buffer(void)
/// @param id The buffer handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_buffer(Buffer buffer, Error *err)
FUNC_ATTR_DEFERRED
{
buf_T *buf = find_buffer_by_handle(buffer, err);
@ -407,6 +409,7 @@ Window vim_get_current_window(void)
///
/// @param handle The window handle
void vim_set_current_window(Window window, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -462,6 +465,7 @@ Tabpage vim_get_current_tabpage(void)
/// @param handle The tab page handle
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err)
FUNC_ATTR_DEFERRED
{
tabpage_T *tp = find_tab_by_handle(tabpage, err);

View File

@ -52,6 +52,7 @@ ArrayOf(Integer, 2) window_get_cursor(Window window, Error *err)
/// @param pos the (row, col) tuple representing the new position
/// @param[out] err Details of an error that may have occurred
void window_set_cursor(Window window, ArrayOf(Integer, 2) pos, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -111,6 +112,7 @@ Integer window_get_height(Window window, Error *err)
/// @param height the new height in rows
/// @param[out] err Details of an error that may have occurred
void window_set_height(Window window, Integer height, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -154,6 +156,7 @@ Integer window_get_width(Window window, Error *err)
/// @param width the new width in columns
/// @param[out] err Details of an error that may have occurred
void window_set_width(Window window, Integer width, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -199,6 +202,7 @@ Object window_get_var(Window window, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
/// @return The old value
Object window_set_var(Window window, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);
@ -234,6 +238,7 @@ Object window_get_option(Window window, String name, Error *err)
/// @param value The option value
/// @param[out] err Details of an error that may have occurred
void window_set_option(Window window, String name, Object value, Error *err)
FUNC_ATTR_DEFERRED
{
win_T *win = find_window_by_handle(window, err);

View File

@ -18,6 +18,8 @@
#include <stdbool.h>
#include <math.h>
#include "nvim/lib/klist.h"
#include "nvim/assert.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
@ -81,11 +83,12 @@
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/os/dl.h"
#include "nvim/os/provider.h"
#include "nvim/os/event.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@ -443,6 +446,16 @@ static dictitem_T vimvars_var; /* variable used for v: */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
// Memory pool for reusing JobEvent structures
typedef struct {
Job *job;
RStream *rstream;
char *type;
} JobEvent;
#define JobEventFreer(x)
KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer)
kmempool_t(JobEventPool) *job_event_pool = NULL;
/*
* Initialize the global and v: variables.
*/
@ -478,6 +491,7 @@ void eval_init(void)
set_vim_var_nr(VV_HLSEARCH, 1L);
set_reg_var(0); /* default for v:register is not 0 but '"' */
job_event_pool = kmp_init(JobEventPool);
}
#if defined(EXITFREE) || defined(PROTO)
@ -19508,35 +19522,55 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
return ret;
}
// JobActivity autocommands will execute vimscript code, so it must be executed
// on Nvim main loop
#define push_job_event(j, r, t) \
do { \
JobEvent *event_data = kmp_alloc(JobEventPool, job_event_pool); \
event_data->job = j; \
event_data->rstream = r; \
event_data->type = t; \
event_push((Event) { \
.handler = on_job_event, \
.data = event_data \
}); \
} while(0)
static void on_job_stdout(RStream *rstream, void *data, bool eof)
{
if (!eof) {
on_job_data(rstream, data, eof, "stdout");
push_job_event(data, rstream, "stdout");
}
}
static void on_job_stderr(RStream *rstream, void *data, bool eof)
{
if (!eof) {
on_job_data(rstream, data, eof, "stderr");
push_job_event(data, rstream, "stderr");
}
}
static void on_job_exit(Job *job, void *data)
{
apply_job_autocmds(job, data, "exit", NULL);
free(data);
push_job_event(data, NULL, "exit");
}
static void on_job_data(RStream *rstream, void *data, bool eof, char *type)
static void on_job_event(Event event)
{
Job *job = data;
uint32_t read_count = rstream_pending(rstream);
char *str = xmalloc(read_count + 1);
JobEvent *data = event.data;
Job *job = data->job;
char *str = NULL;
rstream_read(rstream, str, read_count);
str[read_count] = NUL;
apply_job_autocmds(job, job_data(job), type, str);
if (data->rstream) {
// Read event
size_t read_count = rstream_pending(data->rstream);
str = xmalloc(read_count + 1);
rstream_read(data->rstream, str, read_count);
str[read_count] = NUL;
}
apply_job_autocmds(job, job_data(job), data->type, str);
kmp_free(JobEventPool, job_event_pool, data);
}
static void apply_job_autocmds(Job *job, char *name, char *type, char *str)

View File

@ -179,6 +179,7 @@
#endif
#ifdef DEFINE_FUNC_ATTRIBUTES
#define FUNC_ATTR_DEFERRED
#define FUNC_ATTR_MALLOC REAL_FATTR_MALLOC
#define FUNC_ATTR_ALLOC_SIZE(x) REAL_FATTR_ALLOC_SIZE(x)
#define FUNC_ATTR_ALLOC_SIZE_PROD(x,y) REAL_FATTR_ALLOC_SIZE_PROD(x,y)

View File

@ -39,6 +39,8 @@
static inline kmp_##name##_t *kmp_init_##name(void) { \
return xcalloc(1, sizeof(kmp_##name##_t)); \
} \
static inline void kmp_destroy_##name(kmp_##name##_t *mp) \
REAL_FATTR_UNUSED; \
static inline void kmp_destroy_##name(kmp_##name##_t *mp) { \
size_t k; \
for (k = 0; k < mp->n; ++k) { \

View File

@ -59,7 +59,7 @@
#include "nvim/os/input.h"
#include "nvim/os/os.h"
#include "nvim/os/signal.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/api/private/helpers.h"

View File

@ -6,7 +6,7 @@
#include "nvim/map_defs.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/lib/khash.h"
@ -108,4 +108,5 @@ MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER)
MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(String, rpc_method_handler_fn, DEFAULT_INITIALIZER)
#define MSGPACK_HANDLER_INITIALIZER {.fn = NULL, .defer = false}
MAP_IMPL(String, MsgpackRpcRequestHandler, MSGPACK_HANDLER_INITIALIZER)

View File

@ -5,7 +5,7 @@
#include "nvim/map_defs.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/msgpack_rpc/defs.h"
#define MAP_DECLS(T, U) \
KHASH_DECLARE(T##_##U##_map, T, U) \
@ -25,7 +25,7 @@ MAP_DECLS(cstr_t, uint64_t)
MAP_DECLS(cstr_t, ptr_t)
MAP_DECLS(ptr_t, ptr_t)
MAP_DECLS(uint64_t, ptr_t)
MAP_DECLS(String, rpc_method_handler_fn)
MAP_DECLS(String, MsgpackRpcRequestHandler)
#define map_new(T, U) map_##T##_##U##_new
#define map_free(T, U) map_##T##_##U##_free

View File

@ -5,9 +5,11 @@
#include <uv.h>
#include <msgpack.h>
#include "nvim/lib/klist.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/os/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
@ -15,8 +17,7 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
@ -32,14 +33,14 @@
typedef struct {
uint64_t request_id;
bool errored;
bool returned, errored;
Object result;
} ChannelCallFrame;
typedef struct {
uint64_t id;
PMap(cstr_t) *subscribed_events;
bool is_job, enabled;
bool is_job, closed;
msgpack_unpacker *unpacker;
union {
Job *job;
@ -51,21 +52,32 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
size_t rpc_call_level;
} Channel;
typedef struct {
Channel *channel;
MsgpackRpcRequestHandler handler;
Array args;
uint64_t request_id;
} RequestEvent;
#define RequestEventFreer(x)
KMEMPOOL_INIT(RequestEventPool, RequestEvent, RequestEventFreer)
kmempool_t(RequestEventPool) *request_event_pool = NULL;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.c.generated.h"
# include "msgpack_rpc/channel.c.generated.h"
#endif
/// Initializes the module
void channel_init(void)
{
request_event_pool = kmp_init(RequestEventPool);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@ -104,12 +116,12 @@ uint64_t channel_from_job(char **argv)
channel,
job_out,
job_err,
NULL,
job_exit,
0,
&status);
if (status <= 0) {
close_channel(channel);
free_channel(channel);
return 0;
}
@ -128,8 +140,7 @@ void channel_from_stream(uv_stream_t *stream)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
channel,
NULL);
channel);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@ -142,7 +153,7 @@ bool channel_exists(uint64_t id)
{
Channel *channel;
return (channel = pmap_get(uint64_t)(channels, id)) != NULL
&& channel->enabled;
&& !channel->closed;
}
/// Sends event/arguments to channel
@ -157,7 +168,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_free_array(args);
return false;
}
@ -183,7 +194,7 @@ Object channel_send_call(uint64_t id,
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
api_free_array(args);
return NIL;
@ -203,22 +214,11 @@ Object channel_send_call(uint64_t id,
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
EventSource channel_source = channel->is_job
? job_event_source(channel->data.job)
: rstream_event_source(channel->data.streams.read);
EventSource sources[] = {channel_source, NULL};
// Push the frame
ChannelCallFrame frame = {request_id, false, NIL};
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
size_t size = kv_size(channel->call_stack);
do {
event_poll(-1, sources);
} while (
// Continue running if ...
channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return
event_poll_until(-1, frame.returned);
(void)kv_pop(channel->call_stack);
if (frame.errored) {
api_set_error(err, Exception, "%s", frame.result.data.string.data);
@ -236,7 +236,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort();
}
@ -258,7 +258,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort();
}
@ -273,12 +273,11 @@ bool channel_close(uint64_t id)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
return false;
}
channel_kill(channel);
channel->enabled = false;
close_channel(channel);
return true;
}
@ -291,8 +290,7 @@ static void channel_from_stdio(void)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
channel,
NULL);
channel);
rstream_set_file(channel->data.streams.read, 0);
rstream_start(channel->data.streams.read);
// write stream
@ -320,23 +318,20 @@ static void job_err(RStream *rstream, void *data, bool eof)
}
}
static void job_exit(Job *job, void *data)
{
free_channel((Channel *)data);
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
Channel *channel = data;
channel->rpc_call_level++;
if (eof) {
char buf[256];
snprintf(buf,
sizeof(buf),
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
call_set_error(channel, buf);
goto end;
}
uint32_t count = rstream_pending(rstream);
size_t count = rstream_pending(rstream);
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
count,
rstream);
@ -355,7 +350,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
MSGPACK_UNPACK_SUCCESS) {
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
if (is_valid_rpc_response(&unpacked.data, channel)) {
call_stack_pop(&unpacked.data, channel);
complete_call(&unpacked.data, channel);
} else {
char buf[256];
snprintf(buf,
@ -371,12 +366,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
goto end;
}
// Perform the call
WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
// write the response
if (!channel_write(channel, resp)) {
goto end;
}
handle_request(channel, &unpacked.data);
}
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
@ -398,13 +388,92 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
}
end:
channel->rpc_call_level--;
if (!channel->enabled && !kv_size(channel->call_stack)) {
// Now it's safe to destroy the channel
close_channel(channel);
if (eof && !channel->is_job && !kv_size(channel->call_stack)) {
// The free_channel call is deferred for jobs because it's possible that
// job_stderr will called after this. For non-job channels, this is the
// last callback so it must be freed now.
free_channel(channel);
}
}
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
uint64_t request_id;
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
if (error.set) {
// Validation failed, send response with error
channel_write(channel,
serialize_response(request_id, &error, NIL, &out_buffer));
return;
}
// Retrieve the request handler
MsgpackRpcRequestHandler handler;
msgpack_object method = request->via.array.ptr[2];
if (method.type == MSGPACK_OBJECT_BIN || method.type == MSGPACK_OBJECT_STR) {
handler = msgpack_rpc_get_handler_for(method.via.bin.ptr,
method.via.bin.size);
} else {
handler.fn = msgpack_rpc_handle_missing_method;
handler.defer = false;
}
Array args;
msgpack_rpc_to_array(request->via.array.ptr + 3, &args);
if (kv_size(channel->call_stack) || !handler.defer) {
call_request_handler(channel, handler, args, request_id);
return;
}
// Defer calling the request handler.
RequestEvent *event_data = kmp_alloc(RequestEventPool, request_event_pool);
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
event_push((Event) {
.handler = on_request_event,
.data = event_data
});
}
static void on_request_event(Event event)
{
RequestEvent *e = event.data;
call_request_handler(e->channel, e->handler, e->args, e->request_id);
kmp_free(RequestEventPool, request_event_pool, e);
}
static void call_request_handler(Channel *channel,
MsgpackRpcRequestHandler handler,
Array args,
uint64_t request_id)
{
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, request_id, args, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
if (error.set) {
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
error.msg,
request_id);
channel_write(channel,
serialize_response(request_id, &error, NIL, &out_buffer));
}
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
request_id);
channel_write(channel,
serialize_response(request_id, &error, result, &out_buffer));
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;
@ -501,26 +570,11 @@ static void unsubscribe(Channel *channel, char *event)
free(event_string);
}
/// Close the channel streams/job. The channel resources will be freed by
/// free_channel later.
static void close_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
channel_kill(channel);
free(channel);
}
static void channel_kill(Channel *channel)
{
channel->closed = true;
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@ -537,6 +591,22 @@ static void channel_kill(Channel *channel)
}
}
static void free_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
free(channel);
}
static void close_cb(uv_handle_t *handle)
{
free(handle->data);
@ -546,8 +616,7 @@ static void close_cb(uv_handle_t *handle)
static Channel *register_channel(void)
{
Channel *rv = xmalloc(sizeof(Channel));
rv->enabled = true;
rv->rpc_call_level = 0;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)();
@ -574,9 +643,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
kv_size(channel->call_stack) - 1)->request_id;
}
static void call_stack_pop(msgpack_object *obj, Channel *channel)
static void complete_call(msgpack_object *obj, Channel *channel)
{
ChannelCallFrame *frame = kv_pop(channel->call_stack);
ChannelCallFrame *frame = kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1);
frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
if (frame->errored) {
@ -589,10 +660,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
static void call_set_error(Channel *channel, char *msg)
{
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
channel->enabled = false;
close_channel(channel);
}

View File

@ -1,5 +1,5 @@
#ifndef NVIM_OS_CHANNEL_H
#define NVIM_OS_CHANNEL_H
#ifndef NVIM_MSGPACK_RPC_CHANNEL_H
#define NVIM_MSGPACK_RPC_CHANNEL_H
#include <stdbool.h>
#include <uv.h>
@ -10,6 +10,6 @@
#define METHOD_MAXLEN 512
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/channel.h.generated.h"
# include "msgpack_rpc/channel.h.generated.h"
#endif
#endif // NVIM_OS_CHANNEL_H
#endif // NVIM_MSGPACK_RPC_CHANNEL_H

View File

@ -1,29 +1,23 @@
#ifndef NVIM_OS_MSGPACK_RPC_H
#define NVIM_OS_MSGPACK_RPC_H
#include <stdint.h>
#ifndef NVIM_MSGPACK_RPC_DEFS_H
#define NVIM_MSGPACK_RPC_DEFS_H
#include <msgpack.h>
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/wstream.h"
typedef enum {
kUnpackResultOk, /// Successfully parsed a document
kUnpackResultFail, /// Got unexpected input
kUnpackResultNeedMore /// Need more data
} UnpackResult;
/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
/// functions of this type.
typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
msgpack_object *req,
Error *error);
typedef struct {
Object (*fn)(uint64_t channel_id,
uint64_t request_id,
Array args,
Error *error);
bool defer; // Should the call be deferred to the main loop? This should
// be true if the function mutates editor data structures such
// as buffers, windows, tabs, or if it executes vimscript code.
} MsgpackRpcRequestHandler;
/// Initializes the msgpack-rpc method table
void msgpack_rpc_init(void);
void msgpack_rpc_init_method_table(void);
void msgpack_rpc_init_function_metadata(Dictionary *metadata);
@ -43,9 +37,7 @@ Object msgpack_rpc_dispatch(uint64_t channel_id,
Error *error)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc.h.generated.h"
#endif
#endif // NVIM_OS_MSGPACK_RPC_H
MsgpackRpcRequestHandler msgpack_rpc_get_handler_for(const char *name,
size_t name_len)
FUNC_ATTR_NONNULL_ARG(1);
#endif // NVIM_MSGPACK_RPC_DEFS_H

View File

@ -1,14 +1,18 @@
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
#include <msgpack.h>
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/vim.h"
#include "nvim/log.h"
#include "nvim/memory.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc_helpers.c.generated.h"
# include "msgpack_rpc/helpers.c.generated.h"
#endif
static msgpack_zone zone;
@ -136,10 +140,13 @@ bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
case MSGPACK_OBJECT_EXT:
switch (obj->via.ext.type) {
case kObjectTypeBuffer:
arg->type = kObjectTypeBuffer;
return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
case kObjectTypeWindow:
arg->type = kObjectTypeWindow;
return msgpack_rpc_to_window(obj, &arg->data.window);
case kObjectTypeTabpage:
arg->type = kObjectTypeTabpage;
return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
}
default:
@ -287,3 +294,136 @@ void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
msgpack_rpc_from_object(result.items[i].value, res);
}
}
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
/// @param res A packer that contains the response
void msgpack_rpc_error(char *msg, msgpack_packer *res)
FUNC_ATTR_NONNULL_ALL
{
size_t len = strlen(msg);
// error message
msgpack_pack_bin(res, len);
msgpack_pack_bin_body(res, msg, len);
// Nil result
msgpack_pack_nil(res);
}
/// Handler executed when an invalid method name is passed
Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
uint64_t request_id,
Array args,
Error *error)
{
snprintf(error->msg, sizeof(error->msg), "Invalid method name");
error->set = true;
return NIL;
}
/// Serializes a msgpack-rpc request or notification(id == 0)
WBuffer *serialize_request(uint64_t request_id,
String method,
Array args,
msgpack_sbuffer *sbuffer,
size_t refcount)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, request_id ? 4 : 3);
msgpack_pack_int(&pac, request_id ? 0 : 2);
if (request_id) {
msgpack_pack_uint64(&pac, request_id);
}
msgpack_pack_bin(&pac, method.size);
msgpack_pack_bin_body(&pac, method.data, method.size);
msgpack_rpc_from_array(args, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
refcount,
free);
api_free_array(args);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
/// Serializes a msgpack-rpc response
WBuffer *serialize_response(uint64_t response_id,
Error *err,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2, 4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, response_id);
if (err->set) {
// error represented by a [type, message] array
msgpack_pack_array(&pac, 2);
msgpack_rpc_from_integer(err->type, &pac);
msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
// Nil result
msgpack_pack_nil(&pac);
} else {
// Nil error
msgpack_pack_nil(&pac);
// Return value
msgpack_rpc_from_object(arg, &pac);
}
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
1, // responses only go though 1 channel
free);
api_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
void msgpack_rpc_validate(uint64_t *response_id,
msgpack_object *req,
Error *err)
{
// response id not known yet
*response_id = 0;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
api_set_error(err, Validation, _("Request is not an array"));
}
if (req->via.array.size != 4) {
api_set_error(err, Validation, _("Request array size should be 4"));
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
api_set_error(err, Validation, _("Id must be a positive integer"));
}
// Set the response id, which is the same as the request
*response_id = req->via.array.ptr[1].via.u64;
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
api_set_error(err, Validation, _("Message type must be an integer"));
}
if (req->via.array.ptr[0].via.u64 != 0) {
api_set_error(err, Validation, _("Message type must be 0"));
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
&& req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
api_set_error(err, Validation, _("Method must be a string"));
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
api_set_error(err, Validation, _("Paremeters must be an array"));
}
}

View File

@ -0,0 +1,17 @@
#ifndef NVIM_MSGPACK_RPC_HELPERS_H
#define NVIM_MSGPACK_RPC_HELPERS_H
#include <stdint.h>
#include <stdbool.h>
#include <msgpack.h>
#include "nvim/os/wstream.h"
#include "nvim/api/private/defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/helpers.h.generated.h"
#endif
#endif // NVIM_MSGPACK_RPC_HELPERS_H

View File

@ -5,8 +5,8 @@
#include <uv.h>
#include "nvim/os/channel.h"
#include "nvim/os/server.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/os.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
@ -46,7 +46,7 @@ typedef struct {
static PMap(cstr_t) *servers = NULL;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/server.c.generated.h"
# include "msgpack_rpc/server.c.generated.h"
#endif
/// Initializes the module
@ -119,7 +119,8 @@ int server_start(const char *endpoint)
ip_end = strchr(addr, NUL);
}
uint32_t addr_len = ip_end - addr;
// (ip_end - addr) is always > 0, so convert to size_t
size_t addr_len = (size_t)(ip_end - addr);
if (addr_len > sizeof(ip) - 1) {
// Maximum length of an IP address buffer is 15(eg: 255.255.255.255)

View File

@ -0,0 +1,7 @@
#ifndef NVIM_MSGPACK_RPC_SERVER_H
#define NVIM_MSGPACK_RPC_SERVER_H
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/server.h.generated.h"
#endif
#endif // NVIM_MSGPACK_RPC_SERVER_H

View File

@ -7,8 +7,10 @@
#include "nvim/os/event.h"
#include "nvim/os/input.h"
#include "nvim/os/channel.h"
#include "nvim/os/server.h"
#include "nvim/msgpack_rpc/defs.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
@ -25,25 +27,22 @@ KLIST_INIT(Event, Event, _destroy_event)
typedef struct {
bool timed_out;
int32_t ms;
int ms;
uv_timer_t *timer;
} TimerData;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *deferred_events, *immediate_events;
// NULL-terminated array of event sources that we should process immediately.
//
// Events from sources that are not contained in this array are processed
// later when `event_process` is called
static EventSource *immediate_sources = NULL;
static klist_t(Event) *pending_events;
void event_init(void)
{
// early msgpack-rpc initialization
msgpack_rpc_init_method_table();
msgpack_rpc_helpers_init();
// Initialize the event queues
deferred_events = kl_init(Event);
immediate_events = kl_init(Event);
pending_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@ -52,9 +51,8 @@ void event_init(void)
signal_init();
// Jobs
job_init();
// Channels
// finish mspgack-rpc initialization
channel_init();
// Servers
server_init();
// Providers
provider_init();
@ -68,8 +66,7 @@ void event_teardown(void)
}
// Wait for some event
bool event_poll(int32_t ms, EventSource sources[])
FUNC_ATTR_NONNULL_ARG(2)
void event_poll(int ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@ -100,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[])
run_mode = UV_RUN_NOWAIT;
}
size_t processed_events;
do {
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
processed_events = loop(run_mode, sources);
} while (
// Continue running if ...
!processed_events && // we didn't process any immediate events
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timer_data.timed_out); // we didn't get a timeout
loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
@ -123,68 +109,29 @@ bool event_poll(int32_t ms, EventSource sources[])
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
processed_events += loop(UV_RUN_NOWAIT, sources);
loop(UV_RUN_NOWAIT);
}
return !timer_data.timed_out && (processed_events || event_has_deferred());
}
bool event_has_deferred(void)
{
return !kl_empty(deferred_events);
return !kl_empty(pending_events);
}
// Queue an event
void event_push(Event event)
{
bool defer = true;
if (immediate_sources) {
size_t i;
EventSource src;
for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
if (src == event.source) {
defer = false;
break;
}
}
}
*kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
*kl_pushp(Event, pending_events) = event;
}
void event_process(void)
{
process_from(deferred_events);
}
// Runs the appropriate action for each queued event
static size_t process_from(klist_t(Event) *queue)
{
size_t count = 0;
Event event;
while (kl_shift(Event, queue, &event) == 0) {
switch (event.type) {
case kEventSignal:
signal_handle(event);
break;
case kEventRStreamData:
rstream_read_event(event);
break;
case kEventJobExit:
job_exit_event(event);
break;
default:
abort();
}
count++;
while (kl_shift(Event, pending_events, &event) == 0) {
event.handler(event);
}
DLOG("Processed %u events", count);
return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@ -202,42 +149,9 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
static void requeue_deferred_events(void)
static void loop(uv_run_mode run_mode)
{
size_t remaining = deferred_events->size;
DLOG("Number of deferred events: %u", remaining);
while (remaining--) {
// Re-push each deferred event to ensure it will be in the right queue
Event event;
kl_shift(Event, deferred_events, &event);
event_push(event);
DLOG("Re-queueing event");
}
DLOG("Number of deferred events: %u", deferred_events->size);
}
static size_t loop(uv_run_mode run_mode, EventSource *sources)
{
size_t count;
immediate_sources = sources;
// It's possible that some events from the immediate sources are waiting
// in the deferred queue. If so, move them to the immediate queue so they
// will be processed in order of arrival by the next `process_from` call.
requeue_deferred_events();
count = process_from(immediate_events);
if (count) {
// No need to enter libuv, events were already processed
return count;
}
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
immediate_sources = NULL;
count = process_from(immediate_events);
return count;
}

View File

@ -6,6 +6,27 @@
#include "nvim/os/event_defs.h"
#include "nvim/os/job_defs.h"
#include "nvim/os/time.h"
// Poll for events until a condition is true or a timeout has passed
#define event_poll_until(timeout, condition) \
do { \
int remaining = timeout; \
uint64_t before = (remaining > 0) ? os_hrtime() : 0; \
while (!(condition)) { \
event_poll(remaining); \
if (remaining == 0) { \
break; \
} else if (remaining > 0) { \
uint64_t now = os_hrtime(); \
remaining -= (int) ((now - before) / 1000000); \
before = now; \
if (remaining <= 0) { \
break; \
} \
} \
} \
} while (0)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.h.generated.h"

View File

@ -6,25 +6,12 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
typedef void * EventSource;
typedef struct event Event;
typedef void (*event_handler)(Event event);
typedef enum {
kEventSignal,
kEventRStreamData,
kEventJobExit
} EventType;
typedef struct {
EventSource source;
EventType type;
union {
int signum;
struct {
RStream *ptr;
bool eof;
} rstream;
Job *job;
} data;
} Event;
struct event {
void *data;
event_handler handler;
};
#endif // NVIM_OS_EVENT_DEFS_H

View File

@ -7,7 +7,6 @@
#include "nvim/api/private/defs.h"
#include "nvim/os/input.h"
#include "nvim/os/event.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/ascii.h"
@ -48,10 +47,7 @@ void input_init(void)
}
read_buffer = rbuffer_new(READ_BUFFER_SIZE);
read_stream = rstream_new(read_cb,
read_buffer,
NULL,
NULL);
read_stream = rstream_new(read_cb, read_buffer, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@ -76,7 +72,7 @@ void input_stop(void)
}
// Low level input function.
int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt)
{
InbufPollResult result;
@ -90,7 +86,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
return 0;
}
} else {
if ((result = inbuf_poll(p_ut)) == kInputNone) {
if ((result = inbuf_poll((int)p_ut)) == kInputNone) {
if (trigger_cursorhold() && maxlen >= 3
&& !typebuf_changed(tb_change_cnt)) {
buf[0] = K_SPECIAL;
@ -120,7 +116,9 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
}
convert_input();
return rbuffer_read(input_buffer, (char *)buf, maxlen);
// Safe to convert rbuffer_read to int, it will never overflow since
// we use relatively small buffers.
return (int)rbuffer_read(input_buffer, (char *)buf, (size_t)maxlen);
}
// Check if a character is available for reading
@ -167,23 +165,14 @@ void input_buffer_restore(String str)
free(str.data);
}
static bool input_poll(int32_t ms)
static bool input_poll(int ms)
{
if (embedded_mode) {
EventSource input_sources[] = { signal_event_source(), NULL };
return event_poll(ms, input_sources);
}
EventSource input_sources[] = {
rstream_event_source(read_stream),
NULL
};
return input_ready() || event_poll(ms, input_sources) || input_ready();
event_poll_until(ms, input_ready());
return input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
static InbufPollResult inbuf_poll(int32_t ms)
static InbufPollResult inbuf_poll(int ms)
{
if (typebuf_was_filled || rbuffer_pending(input_buffer)) {
return kInputAvail;
@ -235,7 +224,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof)
static void convert_input(void)
{
if (!rbuffer_available(input_buffer)) {
if (embedded_mode || !rbuffer_available(input_buffer)) {
// No input buffer space
return;
}
@ -273,9 +262,9 @@ static void convert_input(void)
char *inbuf = rbuffer_read_ptr(input_buffer);
size_t count = rbuffer_pending(input_buffer), consume_count = 0;
for (int i = count - 1; i >= 0; i--) {
for (int i = (int)count - 1; i >= 0; i--) {
if (inbuf[i] == 3) {
consume_count = i + 1;
consume_count = (size_t)i;
break;
}
}
@ -304,6 +293,10 @@ static int push_event_key(uint8_t *buf, int maxlen)
// Check if there's pending input
static bool input_ready(void)
{
return rstream_pending(read_stream) > 0 || eof;
return typebuf_was_filled || // API call filled typeahead
event_has_deferred() || // Events must be processed
(!embedded_mode && (
rstream_pending(read_stream) > 0 || // Stdin input
eof)); // Stdin closed
}

View File

@ -12,9 +12,7 @@
#include "nvim/os/wstream_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/time.h"
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/term.h"
@ -99,25 +97,28 @@ void job_teardown(void)
// their status with `wait` or handling SIGCHLD. libuv does that
// automatically (and then calls `exit_cb`) but we have to give it a chance
// by running the loop one more time
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
event_poll(0);
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
if ((job = table[i]) == NULL) {
continue;
}
job = table[i];
// Still alive
while (is_alive(job) && remaining_tries--) {
while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
event_poll(0);
// It's possible that the event_poll call removed the job from the table,
// reset 'job' so the next iteration won't run in that case.
job = table[i];
}
if (is_alive(job)) {
if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
// Last run to ensure all children were removed
event_poll(0);
}
/// Tries to start a new job.
@ -213,14 +214,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
job->out = rstream_new(read_cb,
rbuffer_new(JOB_BUFFER_SIZE),
job,
job_event_source(job));
job->err = rstream_new(read_cb,
rbuffer_new(JOB_BUFFER_SIZE),
job,
job_event_source(job));
job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@ -277,47 +272,33 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
int old_mode = cur_tmode;
settmode(TMODE_COOK);
EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
// Increase pending_refs to stop the exit_cb from being called, which
// could result in the job being freed before we have a chance
// to get the status.
job->pending_refs++;
event_poll_until(ms,
// Until...
got_int || // interrupted by the user
job->pending_refs == 1); // job exited
job->pending_refs--;
// keep track of the elapsed time if ms > 0
uint64_t before = (ms > 0) ? os_hrtime() : 0;
while (1) {
// check if the job has exited (and the status is available).
if (job->pending_refs == 0) {
break;
}
event_poll(ms, sources);
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
if (got_int) {
job_stop(job);
}
if (ms == 0) {
break;
}
// check if the poll timed out, if not, decrease the ms to wait for the
// next run
if (ms > 0) {
uint64_t now = os_hrtime();
ms -= (int) ((now - before) / 1000000);
before = now;
// if the time elapsed is greater than the `ms` wait time, break
if (ms <= 0) {
break;
}
}
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
if (got_int) {
job_stop(job);
event_poll(0);
}
settmode(old_mode);
// return -1 for a timeout, the job status otherwise
return (job->pending_refs) ? -1 : (int) job->status;
if (!job->pending_refs) {
int status = (int) job->status;
job_exit_callback(job);
return status;
}
// return -1 for a timeout
return -1;
}
/// Close the pipe used to write to the job.
@ -369,14 +350,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
void job_exit_event(Event event)
{
job_exit_callback(event.data.job);
}
/// Get the job id
///
/// @param job A pointer to the job
@ -395,11 +368,6 @@ void *job_data(Job *job)
return job->data;
}
EventSource job_event_source(Job *job)
{
return job;
}
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@ -470,7 +438,7 @@ static void read_cb(RStream *rstream, void *data, bool eof)
}
if (eof && --job->pending_refs == 0) {
emit_exit_event(job);
job_exit_callback(job);
}
}
@ -481,20 +449,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
job->status = status;
if (--job->pending_refs == 0) {
emit_exit_event(job);
job_exit_callback(job);
}
}
static void emit_exit_event(Job *job)
{
Event event = {
.source = job_event_source(job),
.type = kEventJobExit,
.data.job = job
};
event_push(event);
}
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);

View File

@ -1,188 +0,0 @@
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
#include <msgpack.h>
#include "nvim/vim.h"
#include "nvim/log.h"
#include "nvim/memory.h"
#include "nvim/os/wstream.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
#include "nvim/func_attr.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc.c.generated.h"
#endif
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
/// @param channel_id The channel id
/// @param req The parsed request object
/// @param res A packer that contains the response
WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_object *req,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2)
FUNC_ATTR_NONNULL_ARG(3)
{
uint64_t response_id;
Error error = ERROR_INIT;
msgpack_rpc_validate(&response_id, req, &error);
if (error.set) {
return serialize_response(response_id, &error, NIL, sbuffer);
}
// dispatch the call
Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
// send the response
msgpack_packer response;
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
error.msg,
response_id);
return serialize_response(response_id, &error, NIL, sbuffer);
}
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
response_id);
return serialize_response(response_id, &error, rv, sbuffer);
}
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message
/// @param res A packer that contains the response
void msgpack_rpc_error(char *msg, msgpack_packer *res)
FUNC_ATTR_NONNULL_ALL
{
size_t len = strlen(msg);
// error message
msgpack_pack_bin(res, len);
msgpack_pack_bin_body(res, msg, len);
// Nil result
msgpack_pack_nil(res);
}
/// Handler executed when an invalid method name is passed
Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
msgpack_object *req,
Error *error)
{
snprintf(error->msg, sizeof(error->msg), "Invalid method name");
error->set = true;
return NIL;
}
/// Serializes a msgpack-rpc request or notification(id == 0)
WBuffer *serialize_request(uint64_t request_id,
String method,
Array args,
msgpack_sbuffer *sbuffer,
size_t refcount)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, request_id ? 4 : 3);
msgpack_pack_int(&pac, request_id ? 0 : 2);
if (request_id) {
msgpack_pack_uint64(&pac, request_id);
}
msgpack_pack_bin(&pac, method.size);
msgpack_pack_bin_body(&pac, method.data, method.size);
msgpack_rpc_from_array(args, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
refcount,
free);
api_free_array(args);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
/// Serializes a msgpack-rpc response
WBuffer *serialize_response(uint64_t response_id,
Error *err,
Object arg,
msgpack_sbuffer *sbuffer)
FUNC_ATTR_NONNULL_ARG(2, 4)
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&pac, 4);
msgpack_pack_int(&pac, 1);
msgpack_pack_uint64(&pac, response_id);
if (err->set) {
// error represented by a [type, message] array
msgpack_pack_array(&pac, 2);
msgpack_rpc_from_integer(err->type, &pac);
msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
// Nil result
msgpack_pack_nil(&pac);
} else {
// Nil error
msgpack_pack_nil(&pac);
// Return value
msgpack_rpc_from_object(arg, &pac);
}
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
1, // responses only go though 1 channel
free);
api_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
return rv;
}
static void msgpack_rpc_validate(uint64_t *response_id,
msgpack_object *req,
Error *err)
{
// response id not known yet
*response_id = 0;
// Validate the basic structure of the msgpack-rpc payload
if (req->type != MSGPACK_OBJECT_ARRAY) {
api_set_error(err, Validation, _("Request is not an array"));
}
if (req->via.array.size != 4) {
api_set_error(err, Validation, _("Request array size should be 4"));
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
api_set_error(err, Validation, _("Id must be a positive integer"));
}
// Set the response id, which is the same as the request
*response_id = req->via.array.ptr[1].via.u64;
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
api_set_error(err, Validation, _("Message type must be an integer"));
}
if (req->via.array.ptr[0].via.u64 != 0) {
api_set_error(err, Validation, _("Message type must be 0"));
}
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
&& req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
api_set_error(err, Validation, _("Method must be a string"));
}
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
api_set_error(err, Validation, _("Paremeters must be an array"));
}
}

View File

@ -1,16 +0,0 @@
#ifndef NVIM_OS_MSGPACK_RPC_HELPERS_H
#define NVIM_OS_MSGPACK_RPC_HELPERS_H
#include <stdint.h>
#include <stdbool.h>
#include <msgpack.h>
#include "nvim/api/private/defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/msgpack_rpc_helpers.h.generated.h"
#endif
#endif // NVIM_OS_MSGPACK_RPC_HELPERS_H

View File

@ -8,7 +8,7 @@
#include "nvim/api/vim.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/os/shell.h"
#include "nvim/os/os.h"
#include "nvim/log.h"

View File

@ -8,8 +8,6 @@
#include "nvim/os/uv_helpers.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/event.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@ -33,7 +31,6 @@ struct rstream {
uv_file fd;
rstream_cb cb;
bool free_handle;
EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count)
void rbuffer_produced(RBuffer *rbuffer, size_t count)
{
rbuffer->wpos += count;
DLOG("Received %u bytes from RStream(address: %p, source: %p)",
(size_t)cnt,
rbuffer->rstream,
rstream_event_source(rbuffer->rstream));
DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream);
rbuffer_relocate(rbuffer);
if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
// The last read filled the buffer, stop reading for now
rstream_stop(rbuffer->rstream);
DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
rstream,
rstream_event_source(rstream));
DLOG("Buffer for RStream(%p) is full, stopping it", rstream);
}
}
@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer)
/// for reading with `rstream_read`
/// @param buffer RBuffer instance to associate with the RStream
/// @param data Some state to associate with the `RStream` instance
/// @param source_override Replacement for the default source used in events
/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
RBuffer *buffer,
void *data,
EventSource source_override)
RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = buffer;
@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb,
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
rv->source_override = source_override ? source_override : rv;
return rv;
}
@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)
return rbuffer_read(rstream->buffer, buffer, count);
}
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
void rstream_read_event(Event event)
{
RStream *rstream = event.data.rstream.ptr;
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
EventSource rstream_event_source(RStream *rstream)
{
return rstream->source_override;
}
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
DLOG("Closing RStream(address: %p, source: %p)",
rstream,
rstream_event_source(rstream));
DLOG("Closing RStream(%p)", rstream);
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
emit_read_event(rstream, true);
rstream->cb(rstream, rstream->data, true);
}
return;
}
@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
emit_read_event(rstream, false);
rstream->cb(rstream, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
emit_read_event(rstream, true);
return;
}
@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(rstream->buffer, nread);
rstream->fpos += nread;
emit_read_event(rstream, false);
}
static void close_cb(uv_handle_t *handle)
@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
static void emit_read_event(RStream *rstream, bool eof)
{
Event event = {
.source = rstream_event_source(rstream),
.type = kEventRStreamData,
.data.rstream = {
.ptr = rstream,
.eof = eof
}
};
event_push(event);
}
static void rbuffer_relocate(RBuffer *rbuffer)
{
// Move data ...

View File

@ -1,7 +0,0 @@
#ifndef NVIM_OS_SERVER_H
#define NVIM_OS_SERVER_H
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/server.h.generated.h"
#endif
#endif // NVIM_OS_SERVER_H

View File

@ -1,7 +0,0 @@
#ifndef NVIM_OS_SERVER_DEFS_H
#define NVIM_OS_SERVER_DEFS_H
typedef struct server Server;
#endif // NVIM_OS_SERVER_DEFS_H

View File

@ -1,4 +1,5 @@
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
@ -7,6 +8,7 @@
#include "nvim/ascii.h"
#include "nvim/lib/kvec.h"
#include "nvim/log.h"
#include "nvim/os/event.h"
#include "nvim/os/job.h"
#include "nvim/os/rstream.h"
#include "nvim/os/shell.h"
@ -58,11 +60,11 @@ typedef struct {
/// `shell_free_argv` when no longer needed.
char **shell_build_argv(const char_u *cmd, const char_u *extra_shell_opt)
{
int argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
size_t argc = tokenize(p_sh, NULL) + tokenize(p_shcf, NULL);
char **rv = xmalloc((unsigned)((argc + 4) * sizeof(char *)));
// Split 'shell'
int i = tokenize(p_sh, rv);
size_t i = tokenize(p_sh, rv);
if (extra_shell_opt != NULL) {
// Push a copy of `extra_shell_opt`
@ -212,7 +214,7 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)
// Keep running the loop until all three handles are completely closed
while (pdata.exited < expected_exits) {
uv_run(uv_default_loop(), UV_RUN_ONCE);
event_poll(0);
if (got_int) {
// Forward SIGINT to the shell
@ -356,9 +358,9 @@ static void system_data_cb(RStream *rstream, void *data, bool eof)
/// @param argv The vector that will be filled with copies of the parsed
/// words. It can be NULL if the caller only needs to count words.
/// @return The number of words parsed.
static int tokenize(const char_u *str, char **argv)
static size_t tokenize(const char_u *str, char **argv)
{
int argc = 0, len;
size_t argc = 0, len;
char_u *p = (char_u *) str;
while (*p != NUL) {
@ -383,11 +385,11 @@ static int tokenize(const char_u *str, char **argv)
///
/// @param str A pointer to the first character of the word
/// @return The offset from `str` at which the word ends.
static int word_length(const char_u *str)
static size_t word_length(const char_u *str)
{
const char_u *p = str;
bool inquote = false;
int length = 0;
size_t length = 0;
// Move `p` to the end of shell word by advancing the pointer while it's
// inside a quote or it's a non-whitespace character
@ -418,15 +420,15 @@ static void write_selection(uv_write_t *req)
// TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and
// only after filled we should start allocating memory(skip unnecessary
// allocations for small writes)
int buflen = BUFFER_LENGTH;
size_t buflen = BUFFER_LENGTH;
pdata->wbuffer = (char *)xmalloc(buflen);
uv_buf_t uvbuf;
linenr_T lnum = curbuf->b_op_start.lnum;
int off = 0;
int written = 0;
size_t off = 0;
size_t written = 0;
char_u *lp = ml_get(lnum);
int l;
int len;
size_t l;
size_t len;
for (;;) {
l = strlen((char *)lp + written);
@ -443,7 +445,7 @@ static void write_selection(uv_write_t *req)
pdata->wbuffer[off++] = NUL;
} else {
char_u *s = vim_strchr(lp + written, NL);
len = s == NULL ? l : s - (lp + written);
len = s == NULL ? l : (size_t)(s - (lp + written));
while (off + len >= buflen) {
// Resize the buffer
buflen *= 2;
@ -584,6 +586,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
ProcessData *data = (ProcessData *)proc->data;
data->exited++;
data->exit_status = status;
assert(status <= INT_MAX);
data->exit_status = (int)status;
uv_close((uv_handle_t *)proc, NULL);
}

View File

@ -12,8 +12,6 @@
#include "nvim/memory.h"
#include "nvim/misc1.h"
#include "nvim/misc2.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/event.h"
#include "nvim/os/signal.h"
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
@ -72,45 +70,6 @@ void signal_accept_deadly(void)
rejecting_deadly = false;
}
void signal_handle(Event event)
{
int signum = event.data.signum;
switch (signum) {
case SIGINT:
got_int = true;
break;
#ifdef SIGPWR
case SIGPWR:
// Signal of a power failure(eg batteries low), flush the swap files to
// be safe
ml_sync_all(false, false);
break;
#endif
case SIGPIPE:
// Ignore
break;
case SIGWINCH:
shell_resized();
break;
case SIGTERM:
case SIGQUIT:
case SIGHUP:
if (!rejecting_deadly) {
deadly_signal(signum);
}
break;
default:
fprintf(stderr, "Invalid signal %d", signum);
break;
}
}
EventSource signal_event_source(void)
{
return &sint;
}
static char * signal_name(int signum)
{
switch (signum) {
@ -154,20 +113,32 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
if (rejecting_deadly) {
if (signum == SIGINT) {
switch (signum) {
case SIGINT:
got_int = true;
}
return;
break;
#ifdef SIGPWR
case SIGPWR:
// Signal of a power failure(eg batteries low), flush the swap files to
// be safe
ml_sync_all(false, false);
break;
#endif
case SIGPIPE:
// Ignore
break;
case SIGWINCH:
shell_resized();
break;
case SIGTERM:
case SIGQUIT:
case SIGHUP:
if (!rejecting_deadly) {
deadly_signal(signum);
}
break;
default:
fprintf(stderr, "Invalid signal %d", signum);
break;
}
Event event = {
.source = signal_event_source(),
.type = kEventSignal,
.data = {
.signum = signum
}
};
event_push(event);
}

View File

@ -1,3 +1,4 @@
#include <assert.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
@ -64,23 +65,6 @@ void os_microdelay(uint64_t microseconds, bool ignoreinput)
}
}
static void microdelay(uint64_t microseconds)
{
uint64_t hrtime;
int64_t ns = microseconds * 1000; // convert to nanoseconds
uv_mutex_lock(&delay_mutex);
while (ns > 0) {
hrtime = uv_hrtime();
if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns) == UV_ETIMEDOUT)
break;
ns -= uv_hrtime() - hrtime;
}
uv_mutex_unlock(&delay_mutex);
}
/// Portable version of POSIX localtime_r()
///
/// @return NULL in case of error
@ -112,3 +96,23 @@ struct tm *os_get_localtime(struct tm *result) FUNC_ATTR_NONNULL_ALL
time_t rawtime = time(NULL);
return os_localtime_r(&rawtime, result);
}
static void microdelay(uint64_t microseconds)
{
uint64_t elapsed = 0;
uint64_t ns = microseconds * 1000; // convert to nanoseconds
uint64_t base = uv_hrtime();
uv_mutex_lock(&delay_mutex);
while (elapsed < ns) {
if (uv_cond_timedwait(&delay_cond, &delay_mutex, ns - elapsed)
== UV_ETIMEDOUT)
break;
uint64_t now = uv_hrtime();
elapsed += now - base;
base = now;
}
uv_mutex_unlock(&delay_mutex);
}

View File

@ -54,8 +54,8 @@
#include "nvim/os/shell.h"
#include "nvim/os/signal.h"
#include "nvim/os/job.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/msgpack_rpc/helpers.h"
#include "nvim/msgpack_rpc/defs.h"
#if defined(HAVE_SYS_IOCTL_H)
# include <sys/ioctl.h>
@ -166,8 +166,6 @@ void mch_init(void)
mac_conv_init();
#endif
msgpack_rpc_init();
msgpack_rpc_helpers_init();
event_init();
}

View File

@ -23,6 +23,15 @@ end
describe('system()', function()
before_each(clear)
it('sets the v:shell_error variable', function()
eval([[system("sh -c 'exit'")]])
eq(0, eval('v:shell_error'))
eval([[system("sh -c 'exit 1'")]])
eq(1, eval('v:shell_error'))
eval([[system("sh -c 'exit 5'")]])
eq(5, eval('v:shell_error'))
end)
describe('passing no input', function()
it('returns the program output', function()
eq("echoed", eval('system("echo -n echoed")'))
@ -83,6 +92,15 @@ describe('systemlist()', function()
-- string.
before_each(clear)
it('sets the v:shell_error variable', function()
eval([[systemlist("sh -c 'exit'")]])
eq(0, eval('v:shell_error'))
eval([[systemlist("sh -c 'exit 1'")]])
eq(1, eval('v:shell_error'))
eval([[systemlist("sh -c 'exit 5'")]])
eq(5, eval('v:shell_error'))
end)
describe('passing string with linefeed characters as input', function()
it('splits the output on linefeed characters', function()
eq({'abc', 'def', 'ghi'}, eval([[systemlist("cat -", "abc\ndef\nghi")]]))