Merge PR #895 'Core service providers...'

This commit is contained in:
Thiago de Arruda 2014-07-17 12:06:31 -03:00
commit 953d61cbf8
32 changed files with 752 additions and 167 deletions

View File

@ -91,6 +91,7 @@ output:write([[
#include <assert.h>
#include <msgpack.h>
#include "nvim/log.h"
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
@ -133,6 +134,7 @@ for i = 1, #api.functions do
output:write('static Object handle_'..fn.name..'(uint64_t channel_id, msgpack_object *req, Error *error)')
output:write('\n{')
output:write('\n DLOG("Received msgpack-rpc call to '..fn.name..'(request id: %" PRIu64 ")", req->via.array.ptr[1].via.u64);')
-- Declare/initialize variables that will hold converted arguments
for j = 1, #fn.parameters do
local param = fn.parameters[j]

View File

@ -22,6 +22,7 @@ set nvim_id $spawn_id
send {
:function BeforeEachTest()
set all&
let &initpython = 'python -c "import neovim; neovim.start_host()"'
redir => groups
silent augroup
redir END

View File

@ -51,6 +51,7 @@ set(CONV_SRCS
os/rstream.c
os/signal.c
os/users.c
os/provider.c
os/uv_helpers.c
os/wstream.c
os/msgpack_rpc.c

View File

@ -109,6 +109,7 @@ StringArray buffer_get_slice(Buffer buffer,
}
start = normalize_index(buf, start) + (include_start ? 0 : 1);
include_end = include_end || (end >= buf->b_ml.ml_line_count);
end = normalize_index(buf, end) + (include_end ? 1 : 0);
if (start >= end) {
@ -169,6 +170,7 @@ void buffer_set_slice(Buffer buffer,
}
start = normalize_index(buf, start) + (include_start ? 0 : 1);
include_end = include_end || (end >= buf->b_ml.ml_line_count);
end = normalize_index(buf, end) + (include_end ? 1 : 0);
if (start > end) {

View File

@ -10,6 +10,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/api/buffer.h"
#include "nvim/os/channel.h"
#include "nvim/os/provider.h"
#include "nvim/vim.h"
#include "nvim/buffer.h"
#include "nvim/window.h"
@ -503,6 +504,22 @@ void vim_unsubscribe(uint64_t channel_id, String event)
channel_unsubscribe(channel_id, e);
}
/// Registers the channel as the provider for `method`. This fails if
/// a provider for `method` is already registered.
///
/// @param channel_id The channel id
/// @param method The method name
/// @param[out] err Details of an error that may have occurred
void vim_register_provider(uint64_t channel_id, String method, Error *err)
{
char buf[METHOD_MAXLEN];
xstrlcpy(buf, method.data, sizeof(buf));
if (!provider_register(buf, channel_id)) {
set_api_error("Provider already registered", err);
}
}
/// Writes a message to vim output or error buffer. The string is split
/// and flushed after each newline. Incomplete lines are kept for writing
/// later.
@ -512,23 +529,24 @@ void vim_unsubscribe(uint64_t channel_id, String event)
/// `emsg` instead of `msg` to print each line)
static void write_msg(String message, bool to_err)
{
static int pos = 0;
static char line_buf[LINE_BUFFER_SIZE];
static int out_pos = 0, err_pos = 0;
static char out_line_buf[LINE_BUFFER_SIZE], err_line_buf[LINE_BUFFER_SIZE];
#define PUSH_CHAR(i, pos, line_buf, msg) \
if (message.data[i] == NL || pos == LINE_BUFFER_SIZE - 1) { \
line_buf[pos] = NUL; \
msg((uint8_t *)line_buf); \
pos = 0; \
continue; \
} \
\
line_buf[pos++] = message.data[i];
for (uint32_t i = 0; i < message.size; i++) {
if (message.data[i] == NL || pos == LINE_BUFFER_SIZE - 1) {
// Flush line
line_buf[pos] = NUL;
if (to_err) {
emsg((uint8_t *)line_buf);
} else {
msg((uint8_t *)line_buf);
}
pos = 0;
continue;
if (to_err) {
PUSH_CHAR(i, err_pos, err_line_buf, emsg);
} else {
PUSH_CHAR(i, out_pos, out_line_buf, msg);
}
line_buf[pos++] = message.data[i];
}
}

View File

@ -944,7 +944,7 @@ doESCkey:
break;
case K_EVENT:
event_process(true);
event_process();
break;
case K_HOME: /* <Home> */

View File

@ -83,8 +83,10 @@
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/os/dl.h"
#include "nvim/os/provider.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@ -6455,6 +6457,7 @@ static struct fst {
{"prevnonblank", 1, 1, f_prevnonblank},
{"printf", 2, 19, f_printf},
{"pumvisible", 0, 0, f_pumvisible},
{"pyeval", 1, 1, f_pyeval},
{"range", 1, 3, f_range},
{"readfile", 1, 3, f_readfile},
{"reltime", 0, 2, f_reltime},
@ -9807,6 +9810,10 @@ static void f_has(typval_T *argvars, typval_T *rettv)
}
}
if (n == FALSE && provider_has_feature((char *)name)) {
n = TRUE;
}
rettv->vval.v_number = n;
}
@ -10560,6 +10567,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
strlen((char *)argvars[1].vval.v_string),
1,
free);
rettv->vval.v_number = job_write(job, buf);
}
@ -11454,7 +11462,13 @@ static void f_pumvisible(typval_T *argvars, typval_T *rettv)
rettv->vval.v_number = 1;
}
/*
* "pyeval()" function
*/
static void f_pyeval(typval_T *argvars, typval_T *rettv)
{
script_host_eval("python_eval", argvars, rettv);
}
/*
* "range()" function
@ -19084,6 +19098,7 @@ static void on_job_stderr(RStream *rstream, void *data, bool eof)
static void on_job_exit(Job *job, void *data)
{
apply_job_autocmds(job, data, "exit", NULL);
free(data);
}
static void on_job_data(RStream *rstream, void *data, bool eof, char *type)
@ -19118,3 +19133,20 @@ static void apply_job_autocmds(Job *job, char *name, char *type, char *str)
apply_autocmds(EVENT_JOBACTIVITY, (uint8_t *)name, NULL, TRUE, NULL);
}
static void script_host_eval(char *method, typval_T *argvars, typval_T *rettv)
{
Object result = provider_call(method, vim_to_object(argvars));
if (result.type == kObjectTypeNil) {
return;
}
Error err = {.set = false};
object_to_vim(result, rettv, &err);
msgpack_rpc_free_object(result);
if (err.set) {
EMSG("Error converting value back to vim");
}
}

View File

@ -54,6 +54,10 @@
#include "nvim/os/os.h"
#include "nvim/os/shell.h"
#include "nvim/os/fs_defs.h"
#include "nvim/os/provider.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
/* Growarray to store info about already sourced scripts.
@ -785,6 +789,22 @@ void ex_profile(exarg_T *eap)
}
}
void ex_python(exarg_T *eap)
{
script_host_execute("python_execute", eap);
}
void ex_pyfile(exarg_T *eap)
{
script_host_execute_file("python_execute_file", eap);
}
void ex_pydo(exarg_T *eap)
{
script_host_do_range("python_do_range", eap);
}
/* Command line expansion for :profile. */
static enum {
PEXP_SUBCMD, /* expand :profile sub-commands */
@ -3219,3 +3239,40 @@ char_u *get_locales(expand_T *xp, int idx)
}
#endif
static void script_host_execute(char *method, exarg_T *eap)
{
char *script = (char *)script_get(eap, eap->arg);
if (!eap->skip) {
String str = cstr_to_string(script ? script : (char *)eap->arg);
Object result = provider_call(method, STRING_OBJ(str));
// We don't care about the result, so free it just in case a bad provider
// returned something
msgpack_rpc_free_object(result);
}
free(script);
}
static void script_host_execute_file(char *method, exarg_T *eap)
{
char buffer[MAXPATHL];
vim_FullName(eap->arg, (uint8_t *)buffer, sizeof(buffer), false);
String file = cstr_to_string(buffer);
Object result = provider_call(method, STRING_OBJ(file));
msgpack_rpc_free_object(result);
}
static void script_host_do_range(char *method, exarg_T *eap)
{
Array arg = {0, 0, 0};
ADD(arg, INTEGER_OBJ(eap->line1));
ADD(arg, INTEGER_OBJ(eap->line2));
ADD(arg, STRING_OBJ(cstr_to_string((char *)eap->arg)));
Object result = provider_call(method, ARRAY_OBJ(arg));
msgpack_rpc_free_object(result);
}

View File

@ -737,6 +737,12 @@ enum CMD_index
RANGE|WHOLEFOLD|BANG|REGSTR|TRLBAR|ZEROR|CMDWIN|MODIFY),
EX(CMD_pwd, "pwd", ex_pwd,
TRLBAR|CMDWIN),
EX(CMD_python, "python", ex_python,
RANGE|EXTRA|NEEDARG|CMDWIN),
EX(CMD_pydo, "pydo", ex_pydo,
RANGE|DFLALL|EXTRA|NEEDARG|CMDWIN),
EX(CMD_pyfile, "pyfile", ex_pyfile,
RANGE|FILE1|NEEDARG|CMDWIN),
EX(CMD_quit, "quit", ex_quit,
BANG|TRLBAR|CMDWIN),
EX(CMD_quitall, "quitall", ex_quit_all,

View File

@ -1859,6 +1859,7 @@ static char_u * do_one_cmd(char_u **cmdlinep,
case CMD_noautocmd:
case CMD_noswapfile:
case CMD_psearch:
case CMD_python:
case CMD_return:
case CMD_rightbelow:
case CMD_silent:

View File

@ -762,7 +762,7 @@ getcmdline (
*/
switch (c) {
case K_EVENT:
event_process(true);
event_process();
// Force a redraw even though the command line didn't change
shell_resized();
goto cmdline_not_changed;
@ -1878,7 +1878,7 @@ redraw:
if (IS_SPECIAL(c1)) {
// Process deferred events
event_process(true);
event_process();
// Ignore other special key codes
continue;
}

View File

@ -49,6 +49,7 @@
#include "nvim/term.h"
#include "nvim/ui.h"
#include "nvim/undo.h"
#include "nvim/os/event.h"
/*
* These buffers are used for storing:
@ -2472,6 +2473,7 @@ inchar (
char_u dum[DUM_LEN + 1];
for (;; ) {
event_process();
len = ui_inchar(dum, DUM_LEN, 0L, 0);
if (len == 0 || (len == 1 && dum[0] == 3))
break;

View File

@ -19,9 +19,9 @@
#endif
// MIN_LOG_LEVEL can be defined during compilation to adjust the desired level
// of logging. DEBUG_LOG_LEVEL is used by default.
// of logging. INFO_LOG_LEVEL is used by default.
#ifndef MIN_LOG_LEVEL
# define MIN_LOG_LEVEL DEBUG_LOG_LEVEL
# define MIN_LOG_LEVEL INFO_LOG_LEVEL
#endif
#ifndef DISABLE_LOG

View File

@ -87,6 +87,7 @@
return rv; \
}
MAP_IMPL(cstr_t, uint64_t, DEFAULT_INITIALIZER)
MAP_IMPL(cstr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(ptr_t, ptr_t, DEFAULT_INITIALIZER)
MAP_IMPL(uint64_t, ptr_t, DEFAULT_INITIALIZER)

View File

@ -19,6 +19,7 @@
U map_##T##_##U##_put(Map(T, U) *map, T key, U value); \
U map_##T##_##U##_del(Map(T, U) *map, T key);
MAP_DECLS(cstr_t, uint64_t)
MAP_DECLS(cstr_t, ptr_t)
MAP_DECLS(ptr_t, ptr_t)
MAP_DECLS(uint64_t, ptr_t)

View File

@ -2074,7 +2074,7 @@ static int do_more_prompt(int typed_char)
toscroll = 0;
switch (c) {
case K_EVENT:
event_process(true);
event_process();
break;
case BS: /* scroll one line back */
case K_BS:
@ -2734,8 +2734,11 @@ do_dialog (
retval = 0;
break;
default: /* Could be a hotkey? */
if (c < 0) /* special keys are ignored here */
if (c < 0) { /* special keys are ignored here */
// drain event queue to prevent infinite loop
event_process();
continue;
}
if (c == ':' && ex_cmd) {
retval = dfltbutton;
ins_char_typebuf(':');

View File

@ -923,6 +923,7 @@ getcount:
/* Adjust the register according to 'clipboard', so that when
* "unnamed" is present it becomes '*' or '+' instead of '"'. */
adjust_clipboard_register(&regname);
set_reg_var(regname);
}
}
@ -5101,6 +5102,7 @@ static void nv_brackets(cmdarg_T *cap)
end = equalpos(start, VIsual) ? curwin->w_cursor : VIsual;
curwin->w_cursor = (dir == BACKWARD ? start : end);
}
adjust_clipboard_register(&regname);
prep_redo_cmd(cap);
do_put(regname, dir, cap->count1, PUT_FIXINDENT);
if (was_visual) {
@ -7267,9 +7269,10 @@ static void nv_put(cmdarg_T *cap)
*/
was_visual = TRUE;
regname = cap->oap->regname;
bool adjusted = adjust_clipboard_register(&regname);
if (regname == 0 || regname == '"'
|| VIM_ISDIGIT(regname) || regname == '-'
|| adjusted
) {
/* The delete is going to overwrite the register we want to
* put, save it first. */
@ -7372,5 +7375,5 @@ static void nv_cursorhold(cmdarg_T *cap)
static void nv_event(cmdarg_T *cap)
{
event_process(true);
event_process();
}

View File

@ -47,6 +47,9 @@
#include "nvim/ui.h"
#include "nvim/undo.h"
#include "nvim/window.h"
#include "nvim/os/provider.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/api/private/helpers.h"
/*
* Registers:
@ -55,8 +58,9 @@
* 10..35 = registers 'a' to 'z'
* 36 = delete register '-'
*/
#define NUM_REGISTERS 37
#define NUM_REGISTERS 38
#define DELETION_REGISTER 36
#define CLIP_REGISTER 37
/*
* Each yank register is an array of pointers to lines.
@ -711,6 +715,8 @@ valid_yank_reg (
|| regname == '"'
|| regname == '-'
|| regname == '_'
|| regname == '*'
|| regname == '+'
)
return TRUE;
return FALSE;
@ -743,6 +749,8 @@ void get_yank_register(int regname, int writing)
y_append = TRUE;
} else if (regname == '-')
i = DELETION_REGISTER;
else if (regname == '*' || regname == '+')
i = CLIP_REGISTER;
else /* not 0-9, a-z, A-Z or '-': use register 0 */
i = 0;
y_current = &(y_regs[i]);
@ -762,6 +770,7 @@ get_register (
) FUNC_ATTR_NONNULL_RET
{
get_yank_register(name, 0);
get_clipboard(name);
struct yankreg *reg = xmalloc(sizeof(struct yankreg));
*reg = *y_current;
@ -789,7 +798,7 @@ void put_register(int name, void *reg)
free_yank_all();
*y_current = *(struct yankreg *)reg;
free(reg);
set_clipboard(name);
}
/*
@ -929,6 +938,7 @@ do_execreg (
}
execreg_lastc = regname;
get_clipboard(regname);
if (regname == '_') /* black hole: don't stuff anything */
return OK;
@ -1093,6 +1103,7 @@ insert_reg (
if (regname != NUL && !valid_yank_reg(regname, FALSE))
return FAIL;
get_clipboard(regname);
if (regname == '.') /* insert last inserted text */
retval = stuff_inserted(NUL, 1L, TRUE);
@ -1278,6 +1289,17 @@ cmdline_paste_reg (
return OK;
}
bool adjust_clipboard_register(int *rp)
{
// If no reg. specified and 'unnamedclip' is set, use the
// clipboard register.
if (*rp == 0 && p_unc && provider_has_feature("clipboard")) {
*rp = '+';
return true;
}
return false;
}
/*
* Handle a delete operation.
@ -1307,6 +1329,7 @@ int op_delete(oparg_T *oap)
return FAIL;
}
bool adjusted = adjust_clipboard_register(&oap->regname);
if (has_mbyte)
mb_adjust_opend(oap);
@ -1389,6 +1412,7 @@ int op_delete(oparg_T *oap)
/* Yank into small delete register when no named register specified
* and the delete is within one line. */
if ((
adjusted ||
oap->regname == 0) && oap->motion_type != MLINE
&& oap->line_count == 1) {
oap->regname = '-';
@ -2336,7 +2360,6 @@ int op_yank(oparg_T *oap, int deleting, int mess)
if (oap->regname == '_') /* black hole: nothing to do */
return OK;
if (!deleting) /* op_delete() already set y_current */
get_yank_register(oap->regname, TRUE);
@ -2519,6 +2542,8 @@ int op_yank(oparg_T *oap, int deleting, int mess)
curbuf->b_op_end.col = MAXCOL;
}
set_clipboard(oap->regname);
return OK;
}
@ -2581,6 +2606,8 @@ do_put (
int allocated = FALSE;
long cnt;
adjust_clipboard_register(&regname);
get_clipboard(regname);
if (flags & PUT_FIXINDENT)
orig_indent = get_indent();
@ -3171,6 +3198,8 @@ void ex_display(exarg_T *eap)
)
continue; /* did not ask for this register */
adjust_clipboard_register(&name);
get_clipboard(name);
if (i == -1) {
if (y_previous != NULL)
@ -4528,6 +4557,9 @@ void write_viminfo_registers(FILE *fp)
for (i = 0; i < NUM_REGISTERS; i++) {
if (y_regs[i].y_array == NULL)
continue;
// Skip '*'/'+' register, we don't want them back next time
if (i == CLIP_REGISTER)
continue;
/* Skip empty registers. */
num_lines = y_regs[i].y_size;
if (num_lines == 0
@ -4607,6 +4639,7 @@ char_u get_reg_type(int regname, long *reglen)
return MCHAR;
}
get_clipboard(regname);
if (regname != NUL && !valid_yank_reg(regname, FALSE))
return MAUTO;
@ -4654,6 +4687,7 @@ get_reg_contents (
if (regname != NUL && !valid_yank_reg(regname, FALSE))
return NULL;
get_clipboard(regname);
if (get_spec_reg(regname, &retval, &allocated, FALSE)) {
if (retval == NULL)
@ -5162,3 +5196,88 @@ void cursor_pos_info(void)
}
}
static void free_register(struct yankreg *reg)
{
// Save 'y_current' into 'curr'
struct yankreg *curr = y_current;
// Set it to 'y_current' since 'free_yank_all' operates on it
y_current = reg;
free_yank_all();
// Restore 'y_current'
y_current = curr;
}
static void copy_register(struct yankreg *dest, struct yankreg *src)
{
free_register(dest);
*dest = *src;
dest->y_array = xcalloc(src->y_size, sizeof(uint8_t *));
for (int j = 0; j < src->y_size; ++j) {
dest->y_array[j] = (uint8_t *)xstrdup((char *)src->y_array[j]);
}
}
static void get_clipboard(int name)
{
if (!(name == '*' || name == '+'
|| (p_unc && !name && provider_has_feature("clipboard")))) {
return;
}
struct yankreg *reg = &y_regs[CLIP_REGISTER];
free_register(reg);
Object result = provider_call("clipboard_get", NIL);
if (result.type != kObjectTypeArray) {
goto err;
}
Array lines = result.data.array;
reg->y_array = xcalloc(lines.size, sizeof(uint8_t *));
reg->y_size = lines.size;
for (size_t i = 0; i < lines.size; i++) {
if (lines.items[i].type != kObjectTypeString) {
goto err;
}
reg->y_array[i] = (uint8_t *)lines.items[i].data.string.data;
}
if (!name && p_unc) {
// copy to the unnamed register
copy_register(&y_regs[0], reg);
}
return;
err:
msgpack_rpc_free_object(result);
free(reg->y_array);
reg->y_array = NULL;
reg->y_size = 0;
EMSG("Clipboard provider returned invalid data");
}
static void set_clipboard(int name)
{
if (!(name == '*' || name == '+'
|| (p_unc && !name && provider_has_feature("clipboard")))) {
return;
}
struct yankreg *reg = &y_regs[CLIP_REGISTER];
if (!name && p_unc) {
// copy from the unnamed register
copy_register(reg, &y_regs[0]);
}
Array lines = {0, 0, 0};
for (int i = 0; i < reg->y_size; i++) {
ADD(lines, STRING_OBJ(cstr_to_string((char *)reg->y_array[i])));
}
Object result = provider_call("clipboard_set", ARRAY_OBJ(lines));
msgpack_rpc_free_object(result);
}

View File

@ -963,6 +963,12 @@ static struct vimoption
{"infercase", "inf", P_BOOL|P_VI_DEF,
(char_u *)&p_inf, PV_INF,
{(char_u *)FALSE, (char_u *)0L} SCRIPTID_INIT},
{"initclipboard","icpb",P_STRING|P_VI_DEF|P_SECURE,
(char_u *)&p_icpb, PV_NONE,
{(char_u *)NULL, (char_u *)0L} SCRIPTID_INIT},
{"initpython","ipy",P_STRING|P_VI_DEF|P_SECURE,
(char_u *)&p_ipy, PV_NONE,
{(char_u *)NULL, (char_u *)0L} SCRIPTID_INIT},
{"insertmode", "im", P_BOOL|P_VI_DEF|P_VIM,
(char_u *)&p_im, PV_NONE,
{(char_u *)FALSE, (char_u *)0L} SCRIPTID_INIT},
@ -1625,6 +1631,9 @@ static struct vimoption
{"undoreload", "ur", P_NUM|P_VI_DEF,
(char_u *)&p_ur, PV_NONE,
{ (char_u *)10000L, (char_u *)0L} SCRIPTID_INIT},
{"unnamedclip", "ucp", P_BOOL|P_VI_DEF|P_VIM,
(char_u *)&p_unc, PV_NONE,
{(char_u *)FALSE, (char_u *)FALSE} SCRIPTID_INIT},
{"updatecount", "uc", P_NUM|P_VI_DEF,
(char_u *)&p_uc, PV_NONE,
{(char_u *)200L, (char_u *)0L} SCRIPTID_INIT},

View File

@ -586,6 +586,7 @@ static char *(p_ttym_values[]) =
EXTERN char_u *p_udir; /* 'undodir' */
EXTERN long p_ul; /* 'undolevels' */
EXTERN long p_ur; /* 'undoreload' */
EXTERN int p_unc; /* 'unnamedclip' */
EXTERN long p_uc; /* 'updatecount' */
EXTERN long p_ut; /* 'updatetime' */
EXTERN char_u *p_fcs; /* 'fillchar' */
@ -630,6 +631,8 @@ EXTERN int p_write; /* 'write' */
EXTERN int p_wa; /* 'writeany' */
EXTERN int p_wb; /* 'writebackup' */
EXTERN long p_wd; /* 'writedelay' */
EXTERN char *p_ipy; // 'initpython'
EXTERN char *p_icpb; // 'initclipboard'
/*
* "indir" values for buffer-local opions.

View File

@ -6,6 +6,7 @@
#include <msgpack.h>
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/os/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
@ -17,9 +18,11 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/lib/kvec.h"
typedef struct {
@ -81,7 +84,8 @@ void channel_teardown(void)
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
bool channel_from_job(char **argv)
/// @return The channel id
uint64_t channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
@ -91,17 +95,17 @@ bool channel_from_job(char **argv)
channel,
job_out,
job_err,
job_exit,
NULL,
true,
0,
&status);
if (status <= 0) {
close_channel(channel);
return false;
return 0;
}
return true;
return channel->id;
}
/// Creates an API channel from a libuv stream representing a tcp or
@ -114,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream)
stream->data = NULL;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true);
channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@ -123,6 +127,13 @@ void channel_from_stream(uv_stream_t *stream)
channel->data.streams.uv = stream;
}
bool channel_exists(uint64_t id)
{
Channel *channel;
return (channel = pmap_get(uint64_t)(channels, id)) != NULL
&& channel->enabled;
}
/// Sends event/data to channel
///
/// @param id The channel id. If 0, the event will be sent to all
@ -135,7 +146,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@ -155,7 +166,7 @@ bool channel_send_call(uint64_t id,
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@ -170,22 +181,18 @@ bool channel_send_call(uint64_t id,
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(cstr_to_string(buf));
msgpack_rpc_free_object(arg);
return false;
}
uint64_t request_id = channel->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, name, arg);
if (!kv_size(channel->call_stack)) {
// This is the first frame, we must disable event deferral for this
// channel because we won't be returning until the client sends a
// response
if (channel->is_job) {
job_set_defer(channel->data.job, false);
} else {
rstream_set_defer(channel->data.streams.read, false);
}
}
EventSource channel_source = channel->is_job
? job_event_source(channel->data.job)
: rstream_event_source(channel->data.streams.read);
EventSource sources[] = {channel_source, NULL};
// Push the frame
ChannelCallFrame frame = {request_id, false, NIL};
@ -193,24 +200,18 @@ bool channel_send_call(uint64_t id,
size_t size = kv_size(channel->call_stack);
do {
event_poll(-1);
event_poll(-1, sources);
} while (
// Continue running if ...
channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return
if (!kv_size(channel->call_stack)) {
// Popped last frame, restore event deferral
if (channel->is_job) {
job_set_defer(channel->data.job, true);
} else {
rstream_set_defer(channel->data.streams.read, true);
}
if (!channel->enabled && !channel->rpc_call_level) {
if (!(kv_size(channel->call_stack)
|| channel->enabled
|| channel->rpc_call_level)) {
// Close the channel if it has been disabled and we have not been called
// by `parse_msgpack`(It would be unsafe to close the channel otherwise)
close_channel(channel);
}
}
*errored = frame.errored;
@ -227,7 +228,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@ -249,7 +250,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@ -264,12 +265,15 @@ static void job_out(RStream *rstream, void *data, bool eof)
static void job_err(RStream *rstream, void *data, bool eof)
{
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
size_t count;
char buf[256];
Channel *channel = job_data(data);
static void job_exit(Job *job, void *data)
{
// TODO(tarruda): what should be done here?
while ((count = rstream_available(rstream))) {
size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
buf[read] = NUL;
ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
}
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
@ -283,12 +287,15 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
disable_channel(channel, buf);
call_set_error(channel, buf);
return;
}
channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
count,
rstream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@ -313,7 +320,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
" a matching id for the current RPC call. Ensure the client "
" is properly synchronized",
channel->id);
call_stack_unwind(channel, buf, 1);
call_set_error(channel, buf);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
@ -366,7 +373,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed due to a failed write",
channel->id);
disable_channel(channel, buf);
call_set_error(channel, buf);
}
return success;
@ -383,7 +390,7 @@ static void send_request(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
channel_write(channel, serialize_request(id, method, arg, &out_buffer));
channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1));
}
static void send_event(Channel *channel,
@ -391,7 +398,7 @@ static void send_event(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
channel_write(channel, serialize_request(0, method, arg, &out_buffer));
channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1));
}
static void broadcast_event(char *name, Object arg)
@ -412,7 +419,11 @@ static void broadcast_event(char *name, Object arg)
}
String method = {.size = strlen(name), .data = name};
WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
WBuffer *buffer = serialize_request(0,
method,
arg,
&out_buffer,
kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
channel_write(kv_A(subscribed, i), buffer);
@ -443,6 +454,15 @@ static void close_channel(Channel *channel)
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@ -453,14 +473,6 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
free(channel);
}
@ -503,10 +515,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
static void call_stack_pop(msgpack_object *obj, Channel *channel)
{
ChannelCallFrame *frame = kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1);
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
(void)kv_pop(channel->call_stack);
if (frame->errored) {
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
@ -515,24 +525,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
}
}
static void call_stack_unwind(Channel *channel, char *msg, int count)
static void call_set_error(Channel *channel, char *msg)
{
while (kv_size(channel->call_stack) && count--) {
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
}
static void disable_channel(Channel *channel, char *msg)
{
if (kv_size(channel->call_stack)) {
// Channel is currently in the middle of a call, remove all frames and mark
// it as "dead"
channel->enabled = false;
call_stack_unwind(channel, msg, -1);
} else {
// Safe to close it now
close_channel(channel);
}
channel->enabled = false;
}

View File

@ -9,6 +9,7 @@
#include "nvim/os/input.h"
#include "nvim/os/channel.h"
#include "nvim/os/server.h"
#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
#include "nvim/os/job.h"
@ -32,6 +33,11 @@ typedef struct {
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *deferred_events, *immediate_events;
// NULL-terminated array of event sources that we should process immediately.
//
// Events from sources that are not contained in this array are processed
// later when `event_process` is called
static EventSource *immediate_sources = NULL;
void event_init(void)
{
@ -50,6 +56,8 @@ void event_init(void)
channel_init();
// Servers
server_init();
// Providers
provider_init();
}
void event_teardown(void)
@ -60,7 +68,8 @@ void event_teardown(void)
}
// Wait for some event
bool event_poll(int32_t ms)
bool event_poll(int32_t ms, EventSource sources[])
FUNC_ATTR_NONNULL_ARG(2)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@ -91,16 +100,15 @@ bool event_poll(int32_t ms)
run_mode = UV_RUN_NOWAIT;
}
bool events_processed;
size_t processed_events;
do {
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
uv_run(uv_default_loop(), run_mode);
events_processed = event_process(false);
processed_events = loop(run_mode, sources);
} while (
// Continue running if ...
!events_processed && // we didn't process any immediate events
!processed_events && // we didn't process any immediate events
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timer_data.timed_out); // we didn't get a timeout
@ -115,32 +123,49 @@ bool event_poll(int32_t ms)
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
event_process(false);
processed_events += loop(UV_RUN_NOWAIT, sources);
}
return !timer_data.timed_out && (events_processed || event_has_deferred());
return !timer_data.timed_out && (processed_events || event_has_deferred());
}
bool event_has_deferred(void)
{
return !kl_empty(get_queue(true));
return !kl_empty(deferred_events);
}
// Push an event to the queue
void event_push(Event event, bool deferred)
// Queue an event
void event_push(Event event)
{
*kl_pushp(Event, get_queue(deferred)) = event;
bool defer = true;
if (immediate_sources) {
size_t i;
EventSource src;
for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
if (src == event.source) {
defer = false;
break;
}
}
}
*kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
}
void event_process(void)
{
process_from(deferred_events);
}
// Runs the appropriate action for each queued event
bool event_process(bool deferred)
static size_t process_from(klist_t(Event) *queue)
{
bool processed_events = false;
size_t count = 0;
Event event;
while (kl_shift(Event, get_queue(deferred), &event) == 0) {
processed_events = true;
while (kl_shift(Event, queue, &event) == 0) {
switch (event.type) {
case kEventSignal:
signal_handle(event);
@ -154,9 +179,12 @@ bool event_process(bool deferred)
default:
abort();
}
count++;
}
return processed_events;
DLOG("Processed %u events", count);
return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@ -174,7 +202,42 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
static klist_t(Event) *get_queue(bool deferred)
static void requeue_deferred_events(void)
{
return deferred ? deferred_events : immediate_events;
size_t remaining = deferred_events->size;
DLOG("Number of deferred events: %u", remaining);
while (remaining--) {
// Re-push each deferred event to ensure it will be in the right queue
Event event;
kl_shift(Event, deferred_events, &event);
event_push(event);
DLOG("Re-queueing event");
}
DLOG("Number of deferred events: %u", deferred_events->size);
}
static size_t loop(uv_run_mode run_mode, EventSource *sources)
{
size_t count;
immediate_sources = sources;
// It's possible that some events from the immediate sources are waiting
// in the deferred queue. If so, move them to the immediate queue so they
// will be processed in order of arrival by the next `process_from` call.
requeue_deferred_events();
count = process_from(immediate_events);
if (count) {
// No need to enter libuv, events were already processed
return count;
}
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
immediate_sources = NULL;
count = process_from(immediate_events);
return count;
}

View File

@ -6,6 +6,8 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
typedef void * EventSource;
typedef enum {
kEventSignal,
kEventRStreamData,
@ -13,6 +15,7 @@ typedef enum {
} EventType;
typedef struct {
EventSource source;
EventType type;
union {
int signum;

View File

@ -34,7 +34,7 @@ static bool eof = false, started_reading = false;
void input_init(void)
{
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false);
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@ -129,7 +129,12 @@ bool os_isatty(int fd)
static bool input_poll(int32_t ms)
{
return input_ready() || event_poll(ms) || input_ready();
EventSource input_sources[] = {
rstream_event_source(read_stream),
NULL
};
return input_ready() || event_poll(ms, input_sources) || input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c

View File

@ -214,8 +214,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@ -269,18 +269,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
/// Sets the `defer` flag for a Job instance
///
/// @param rstream The Job id
/// @param defer The new value for the flag
void job_set_defer(Job *job, bool defer)
{
job->defer = defer;
rstream_set_defer(job->out, defer);
rstream_set_defer(job->err, defer);
}
/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
@ -307,6 +295,11 @@ void *job_data(Job *job)
return job->data;
}
EventSource job_event_source(Job *job)
{
return job;
}
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@ -391,10 +384,12 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
static void emit_exit_event(Job *job)
{
Event event;
event.type = kEventJobExit;
event.data.job = job;
event_push(event, true);
Event event = {
.source = job_event_source(job),
.type = kEventJobExit,
.data.job = job
};
event_push(event);
}
static void close_cb(uv_handle_t *handle)
@ -408,7 +403,6 @@ static void close_cb(uv_handle_t *handle)
rstream_free(job->err);
wstream_free(job->in);
shell_free_argv(job->proc_opts.args);
free(job->data);
free(job);
}
}

View File

@ -1,9 +1,11 @@
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
#include <msgpack.h>
#include "nvim/vim.h"
#include "nvim/log.h"
#include "nvim/memory.h"
#include "nvim/os/wstream.h"
#include "nvim/os/msgpack_rpc.h"
@ -51,9 +53,14 @@ WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
error.msg,
response_id);
return serialize_response(response_id, error.msg, NIL, sbuffer);
}
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
response_id);
return serialize_response(response_id, NULL, rv, sbuffer);
}
@ -113,7 +120,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
WBuffer *serialize_request(uint64_t request_id,
String method,
Object arg,
msgpack_sbuffer *sbuffer)
msgpack_sbuffer *sbuffer,
size_t refcount)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
@ -130,6 +138,7 @@ WBuffer *serialize_request(uint64_t request_id,
msgpack_rpc_from_object(arg, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
refcount,
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@ -165,6 +174,7 @@ WBuffer *serialize_response(uint64_t response_id,
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
1, // responses only go though 1 channel
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@ -190,6 +200,7 @@ WBuffer *serialize_metadata(uint64_t id,
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
1,
free);
msgpack_sbuffer_clear(sbuffer);
return rv;

215
src/nvim/os/provider.c Normal file
View File

@ -0,0 +1,215 @@
#include <stdint.h>
#include <inttypes.h>
#include <stdbool.h>
#include <assert.h>
#include "nvim/os/provider.h"
#include "nvim/memory.h"
#include "nvim/api/vim.h"
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/os/channel.h"
#include "nvim/os/shell.h"
#include "nvim/os/os.h"
#include "nvim/log.h"
#include "nvim/map.h"
#include "nvim/message.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#define FEATURE_COUNT (sizeof(features) / sizeof(features[0]))
#define FEATURE(feature_name, provider_bootstrap_command, ...) { \
.name = feature_name, \
.bootstrap_command = provider_bootstrap_command, \
.argv = NULL, \
.channel_id = 0, \
.methods = (char *[]){__VA_ARGS__, NULL} \
}
static struct feature {
char *name, **bootstrap_command, **argv, **methods;
size_t name_length;
uint64_t channel_id;
} features[] = {
FEATURE("python",
&p_ipy,
"python_execute",
"python_execute_file",
"python_do_range",
"python_eval"),
FEATURE("clipboard",
&p_icpb,
"clipboard_get",
"clipboard_set")
};
static Map(cstr_t, uint64_t) *registered_providers = NULL;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/provider.c.generated.h"
#endif
void provider_init(void)
{
registered_providers = map_new(cstr_t, uint64_t)();
}
bool provider_has_feature(char *name)
{
for (size_t i = 0; i < FEATURE_COUNT; i++) {
struct feature *f = &features[i];
if (!STRICMP(name, f->name)) {
return f->channel_id || can_execute(f);
}
}
return false;
}
bool provider_available(char *method)
{
return map_has(cstr_t, uint64_t)(registered_providers, method);
}
bool provider_register(char *method, uint64_t channel_id)
{
if (map_has(cstr_t, uint64_t)(registered_providers, method)) {
return false;
}
// First check if this method is part of a feature, and if so, update
// the feature structure with the channel id
struct feature *f = get_feature_for(method);
if (f) {
DLOG("Registering provider for \"%s\" "
"which is part of the \"%s\" feature",
method,
f->name);
f->channel_id = channel_id;
}
map_put(cstr_t, uint64_t)(registered_providers, xstrdup(method), channel_id);
ILOG("Registered channel %" PRIu64 " as the provider for \"%s\"",
channel_id,
method);
return true;
}
Object provider_call(char *method, Object arg)
{
uint64_t channel_id = get_provider_for(method);
if (!channel_id) {
char buf[256];
snprintf(buf,
sizeof(buf),
"Provider for \"%s\" is not available",
method);
report_error(buf);
return NIL;
}
bool error = false;
Object result = NIL;
channel_send_call(channel_id, method, arg, &result, &error);
if (error) {
report_error(result.data.string.data);
msgpack_rpc_free_object(result);
return NIL;
}
return result;
}
static uint64_t get_provider_for(char *method)
{
uint64_t channel_id = map_get(cstr_t, uint64_t)(registered_providers, method);
if (channel_id) {
return channel_id;
}
// Try to bootstrap if the method is part of a feature
struct feature *f = get_feature_for(method);
if (!f || !can_execute(f)) {
ELOG("Cannot bootstrap provider for \"%s\"", method);
goto err;
}
if (f->channel_id) {
ELOG("Already bootstrapped provider for \"%s\"", f->name);
goto err;
}
f->channel_id = channel_from_job(f->argv);
if (!f->channel_id) {
ELOG("The provider for \"%s\" failed to bootstrap", f->name);
goto err;
}
return f->channel_id;
err:
// Ensure we won't try to restart the provider
f->bootstrap_command = NULL;
f->channel_id = 0;
return 0;
}
static bool can_execute(struct feature *f)
{
if (!f->bootstrap_command) {
return false;
}
char *cmd = *f->bootstrap_command;
if (!cmd || !strlen(cmd)) {
return false;
}
if (!f->argv) {
f->argv = shell_build_argv((uint8_t *)cmd, NULL);
}
return os_can_exe((uint8_t *)f->argv[0]);
}
static void report_error(char *str)
{
vim_err_write((String) {.data = str, .size = strlen(str)});
vim_err_write((String) {.data = "\n", .size = 1});
}
static bool feature_has_method(struct feature *f, char *method)
{
size_t i;
char *m;
for (m = f->methods[i = 0]; m; m = f->methods[++i]) {
if (!STRCMP(method, m)) {
return true;
}
}
return false;
}
static struct feature *get_feature_for(char *method)
{
for (size_t i = 0; i < FEATURE_COUNT; i++) {
struct feature *f = &features[i];
if (feature_has_method(f, method)) {
return f;
}
}
return NULL;
}

11
src/nvim/os/provider.h Normal file
View File

@ -0,0 +1,11 @@
#ifndef NVIM_OS_PROVIDER_H
#define NVIM_OS_PROVIDER_H
#include "nvim/api/private/defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/provider.h.generated.h"
#endif
#endif // NVIM_OS_PROVIDER_H

View File

@ -26,7 +26,8 @@ struct rstream {
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
bool reading, free_handle, defer;
bool reading, free_handle;
EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@ -40,25 +41,25 @@ struct rstream {
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
/// @param defer Flag that specifies if callback invocation should be deferred
/// to vim main loop(as a KE_EVENT special key)
/// @param source_override Replacement for the default source used in events
/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
bool defer)
EventSource source_override)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
rv->source_override = source_override ? source_override : rv;
return rv;
}
@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
/// Sets the `defer` flag for a a RStream instance
///
/// @param rstream The RStream instance
/// @param defer The new value for the flag
void rstream_set_defer(RStream *rstream, bool defer)
{
rstream->defer = defer;
}
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
@ -232,6 +224,11 @@ void rstream_read_event(Event event)
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
EventSource rstream_event_source(RStream *rstream)
{
return rstream->source_override;
}
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@ -260,6 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
DLOG("Closing RStream(address: %p, source: %p)",
rstream,
rstream_event_source(rstream));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
@ -274,10 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rstream->wpos += nread;
DLOG("Received %u bytes from RStream(address: %p, source: %p)",
(size_t)cnt,
rstream,
rstream_event_source(rstream));
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
rstream,
rstream_event_source(rstream));
}
rstream->reading = false;
@ -342,9 +349,13 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof)
{
Event event;
event.type = kEventRStreamData;
event.data.rstream.ptr = rstream;
event.data.rstream.eof = eof;
event_push(event, rstream->defer);
Event event = {
.source = rstream_event_source(rstream),
.type = kEventRStreamData,
.data.rstream = {
.ptr = rstream,
.eof = eof
}
};
event_push(event);
}

View File

@ -103,6 +103,11 @@ void signal_handle(Event event)
}
}
EventSource signal_event_source(void)
{
return &sint;
}
static char * signal_name(int signum)
{
switch (signum) {
@ -155,10 +160,11 @@ static void signal_cb(uv_signal_t *handle, int signum)
}
Event event = {
.source = signal_event_source(),
.type = kEventSignal,
.data = {
.signum = signum
}
};
event_push(event, true);
event_push(event);
}

View File

@ -42,7 +42,8 @@ typedef struct {
/// Creates a new WStream instance. A WStream encapsulates all the boilerplate
/// necessary for writing to a libuv stream.
///
/// @param maxmem Maximum amount memory used by this `WStream` instance.
/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0,
/// a default value of 10mb will be used.
/// @return The newly-allocated `WStream` instance
WStream * wstream_new(size_t maxmem)
{
@ -91,33 +92,33 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
/// @return false if the write failed
bool wstream_write(WStream *wstream, WBuffer *buffer)
{
WriteData *data;
uv_buf_t uvbuf;
uv_write_t *req;
// This should not be called after a wstream was freed
assert(!wstream->freed);
buffer->refcount++;
if (wstream->curmem > wstream->maxmem) {
goto err;
}
wstream->curmem += buffer->size;
data = xmalloc(sizeof(WriteData));
WriteData *data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
data->buffer = buffer;
req = xmalloc(sizeof(uv_write_t));
uv_write_t *req = xmalloc(sizeof(uv_write_t));
req->data = data;
uv_buf_t uvbuf;
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
wstream->pending_reqs++;
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
free(data);
free(req);
goto err;
}
wstream->pending_reqs++;
return true;
err:
@ -132,14 +133,19 @@ err:
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
/// @param refcount The number of references for the WBuffer. This will be used
/// by WStream instances to decide when a WBuffer should be freed.
/// @param cb Pointer to function that will be responsible for freeing
/// the buffer data(passing 'free' will work as expected).
/// @return The allocated WBuffer instance
WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
WBuffer *wstream_new_buffer(char *data,
size_t size,
size_t refcount,
wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
rv->refcount = 0;
rv->refcount = refcount;
rv->cb = cb;
rv->data = data;

View File

@ -9,7 +9,7 @@ STARTTEST
:so small.vim
:set encoding=latin1
:set noswapfile
:if !has('python') | e! test.ok | wq! test.out | endif
:if !has('python') || has('neovim') | e! test.ok | wq! test.out | endif
:lang C
:fun Test()
:py import vim