This commit is contained in:
Thiago de Arruda 2014-06-18 12:16:53 -03:00
commit 090870ca04
27 changed files with 427 additions and 257 deletions

View File

@ -186,7 +186,7 @@ for i = 1, #api.functions do
if #args > 0 then
output:write('channel_id, '..call_args)
else
output:write('channel_id)')
output:write('channel_id')
end
else
output:write(call_args)

View File

@ -5,7 +5,7 @@ if {$argc < 2} {
exit 1
}
set timeout 10
set timeout 60
set run_tests [split [lindex $argv 0] " "]
set run_nvim [split [lindex $argv 1] " "]

View File

@ -31,7 +31,7 @@
/// @return The line count
Integer buffer_get_length(Buffer buffer, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return 0;
@ -100,7 +100,7 @@ StringArray buffer_get_slice(Buffer buffer,
Error *err)
{
StringArray rv = ARRAY_DICT_INIT;
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;
@ -160,7 +160,7 @@ void buffer_set_slice(Buffer buffer,
StringArray replacement,
Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@ -283,7 +283,7 @@ end:
/// @return The variable value
Object buffer_get_var(Buffer buffer, String name, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@ -301,7 +301,7 @@ Object buffer_get_var(Buffer buffer, String name, Error *err)
/// @return The old value
Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@ -318,7 +318,7 @@ Object buffer_set_var(Buffer buffer, String name, Object value, Error *err)
/// @return The option value
Object buffer_get_option(Buffer buffer, String name, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return (Object) OBJECT_INIT;
@ -336,7 +336,7 @@ Object buffer_get_option(Buffer buffer, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@ -353,7 +353,7 @@ void buffer_set_option(Buffer buffer, String name, Object value, Error *err)
Integer buffer_get_number(Buffer buffer, Error *err)
{
Integer rv = 0;
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;
@ -370,7 +370,7 @@ Integer buffer_get_number(Buffer buffer, Error *err)
String buffer_get_name(Buffer buffer, Error *err)
{
String rv = STRING_INIT;
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf || buf->b_ffname == NULL) {
return rv;
@ -386,7 +386,7 @@ String buffer_get_name(Buffer buffer, Error *err)
/// @param[out] err Details of an error that may have occurred
void buffer_set_name(Buffer buffer, String name, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@ -416,7 +416,7 @@ void buffer_set_name(Buffer buffer, String name, Error *err)
Boolean buffer_is_valid(Buffer buffer)
{
Error stub = {.set = false};
return find_buffer(buffer, &stub) != NULL;
return find_buffer_by_handle(buffer, &stub) != NULL;
}
/// Inserts a sequence of lines to a buffer at a certain index
@ -440,7 +440,7 @@ void buffer_insert(Buffer buffer, Integer lnum, StringArray lines, Error *err)
Position buffer_get_mark(Buffer buffer, String name, Error *err)
{
Position rv = POSITION_INIT;
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return rv;

View File

@ -49,14 +49,14 @@ typedef struct {
typedef struct {
Object *items;
size_t size;
size_t size, capacity;
} Array;
typedef struct key_value_pair KeyValuePair;
typedef struct {
KeyValuePair *items;
size_t size;
size_t size, capacity;
} Dictionary;
typedef enum {

View File

@ -288,12 +288,7 @@ Object vim_to_object(typval_T *obj)
return rv;
}
/// Finds the pointer for a window number
///
/// @param window the window number
/// @param[out] err Details of an error that may have occurred
/// @return the window pointer
buf_T *find_buffer(Buffer buffer, Error *err)
buf_T *find_buffer_by_handle(Buffer buffer, Error *err)
{
buf_T *rv = handle_get_buffer(buffer);
@ -304,12 +299,7 @@ buf_T *find_buffer(Buffer buffer, Error *err)
return rv;
}
/// Finds the pointer for a window number
///
/// @param window the window number
/// @param[out] err Details of an error that may have occurred
/// @return the window pointer
win_T * find_window(Window window, Error *err)
win_T * find_window_by_handle(Window window, Error *err)
{
win_T *rv = handle_get_window(window);
@ -320,12 +310,7 @@ win_T * find_window(Window window, Error *err)
return rv;
}
/// Finds the pointer for a tabpage number
///
/// @param tabpage the tabpage number
/// @param[out] err Details of an error that may have occurred
/// @return the tabpage pointer
tabpage_T * find_tab(Tabpage tabpage, Error *err)
tabpage_T * find_tab_by_handle(Tabpage tabpage, Error *err)
{
tabpage_T *rv = handle_get_tabpage(tabpage);

View File

@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/lib/kvec.h"
#define set_api_error(message, err) \
do { \
@ -13,6 +14,48 @@
err->set = true; \
} while (0)
#define BOOL_OBJ(b) ((Object) { \
.type = kObjectTypeBoolean, \
.data.boolean = b \
})
#define INTEGER_OBJ(i) ((Object) { \
.type = kObjectTypeInteger, \
.data.integer = i \
})
#define STRING_OBJ(s) ((Object) { \
.type = kObjectTypeString, \
.data.string = cstr_to_string(s) \
})
#define STRINGL_OBJ(d, s) ((Object) { \
.type = kObjectTypeString, \
.data.string = (String) { \
.size = s, \
.data = xmemdup(d, s) \
}})
#define ARRAY_OBJ(a) ((Object) { \
.type = kObjectTypeArray, \
.data.array = a \
})
#define DICTIONARY_OBJ(d) ((Object) { \
.type = kObjectTypeDictionary, \
.data.dictionary = d \
})
#define NIL ((Object) {.type = kObjectTypeNil})
#define PUT(dict, k, v) \
kv_push(KeyValuePair, \
dict, \
((KeyValuePair) {.key = cstr_to_string(k), .value = v}))
#define ADD(array, item) \
kv_push(Object, array, item)
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "api/private/helpers.h.generated.h"
#endif

View File

@ -16,7 +16,7 @@
WindowArray tabpage_get_windows(Tabpage tabpage, Error *err)
{
WindowArray rv = ARRAY_DICT_INIT;
tabpage_T *tab = find_tab(tabpage, err);
tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return rv;
@ -53,7 +53,7 @@ WindowArray tabpage_get_windows(Tabpage tabpage, Error *err)
/// @return The variable value
Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
{
tabpage_T *tab = find_tab(tabpage, err);
tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return (Object) OBJECT_INIT;
@ -71,7 +71,7 @@ Object tabpage_get_var(Tabpage tabpage, String name, Error *err)
/// @return The tab page handle
Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
{
tabpage_T *tab = find_tab(tabpage, err);
tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return (Object) OBJECT_INIT;
@ -88,7 +88,7 @@ Object tabpage_set_var(Tabpage tabpage, String name, Object value, Error *err)
Window tabpage_get_window(Tabpage tabpage, Error *err)
{
Window rv = 0;
tabpage_T *tab = find_tab(tabpage, err);
tabpage_T *tab = find_tab_by_handle(tabpage, err);
if (!tab) {
return rv;
@ -117,6 +117,6 @@ Window tabpage_get_window(Tabpage tabpage, Error *err)
Boolean tabpage_is_valid(Tabpage tabpage)
{
Error stub = {.set = false};
return find_tab(tabpage, &stub) != NULL;
return find_tab_by_handle(tabpage, &stub) != NULL;
}

View File

@ -291,7 +291,7 @@ Buffer vim_get_current_buffer(void)
/// @param[out] err Details of an error that may have occurred
void vim_set_current_buffer(Buffer buffer, Error *err)
{
buf_T *buf = find_buffer(buffer, err);
buf_T *buf = find_buffer_by_handle(buffer, err);
if (!buf) {
return;
@ -348,7 +348,7 @@ Window vim_get_current_window(void)
/// @param handle The window handle
void vim_set_current_window(Window window, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@ -407,7 +407,7 @@ Tabpage vim_get_current_tabpage(void)
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err)
{
tabpage_T *tp = find_tab(tabpage, err);
tabpage_T *tp = find_tab_by_handle(tabpage, err);
if (!tp) {
return;

View File

@ -19,7 +19,7 @@
/// @return The buffer handle
Buffer window_get_buffer(Window window, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@ -36,7 +36,7 @@ Buffer window_get_buffer(Window window, Error *err)
Position window_get_cursor(Window window, Error *err)
{
Position rv = {.row = 0, .col = 0};
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (win) {
rv.row = win->w_cursor.lnum;
@ -53,7 +53,7 @@ Position window_get_cursor(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_cursor(Window window, Position pos, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@ -89,7 +89,7 @@ void window_set_cursor(Window window, Position pos, Error *err)
/// @return the height in rows
Integer window_get_height(Window window, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@ -106,7 +106,7 @@ Integer window_get_height(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_height(Window window, Integer height, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@ -132,7 +132,7 @@ void window_set_height(Window window, Integer height, Error *err)
/// @return the width in columns
Integer window_get_width(Window window, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return 0;
@ -149,7 +149,7 @@ Integer window_get_width(Window window, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_width(Window window, Integer width, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@ -176,7 +176,7 @@ void window_set_width(Window window, Integer width, Error *err)
/// @return The variable value
Object window_get_var(Window window, String name, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@ -194,7 +194,7 @@ Object window_get_var(Window window, String name, Error *err)
/// @return The old value
Object window_set_var(Window window, String name, Object value, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@ -211,7 +211,7 @@ Object window_set_var(Window window, String name, Object value, Error *err)
/// @return The option value
Object window_get_option(Window window, String name, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return (Object) OBJECT_INIT;
@ -229,7 +229,7 @@ Object window_get_option(Window window, String name, Error *err)
/// @param[out] err Details of an error that may have occurred
void window_set_option(Window window, String name, Object value, Error *err)
{
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (!win) {
return;
@ -246,7 +246,7 @@ void window_set_option(Window window, String name, Object value, Error *err)
Position window_get_position(Window window, Error *err)
{
Position rv;
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (win) {
rv.col = win->w_wincol;
@ -264,7 +264,7 @@ Position window_get_position(Window window, Error *err)
Tabpage window_get_tabpage(Window window, Error *err)
{
Tabpage rv = 0;
win_T *win = find_window(window, err);
win_T *win = find_window_by_handle(window, err);
if (win) {
rv = win_find_tabpage(win)->handle;
@ -280,6 +280,6 @@ Tabpage window_get_tabpage(Window window, Error *err)
Boolean window_is_valid(Window window)
{
Error stub = {.set = false};
return find_window(window, &stub) != NULL;
return find_window_by_handle(window, &stub) != NULL;
}

View File

@ -939,7 +939,7 @@ doESCkey:
break;
case K_EVENT:
event_process();
event_process(true);
break;
case K_HOME: /* <Home> */

View File

@ -70,6 +70,7 @@
#include "nvim/os/rstream_defs.h"
#include "nvim/os/time.h"
#include "nvim/os/channel.h"
#include "nvim/api/private/helpers.h"
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
@ -10471,11 +10472,13 @@ static void f_job_start(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
rettv->vval.v_number = job_start(argv,
xstrdup((char *)argvars[0].vval.v_string),
on_job_stdout,
on_job_stderr,
on_job_exit);
job_start(argv,
xstrdup((char *)argvars[0].vval.v_string),
on_job_stdout,
on_job_stderr,
on_job_exit,
true,
&rettv->vval.v_number);
if (rettv->vval.v_number <= 0) {
if (rettv->vval.v_number == 0) {
@ -10502,19 +10505,21 @@ static void f_job_stop(typval_T *argvars, typval_T *rettv)
return;
}
if (!job_stop(argvars[0].vval.v_number)) {
Job *job = job_find(argvars[0].vval.v_number);
if (!job) {
// Probably an invalid job id
EMSG(_(e_invjob));
return;
}
job_stop(job);
rettv->vval.v_number = 1;
}
// "jobwrite()" function
static void f_job_write(typval_T *argvars, typval_T *rettv)
{
bool res;
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
@ -10529,16 +10534,17 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
return;
}
res = job_write(argvars[0].vval.v_number,
xstrdup((char *)argvars[1].vval.v_string),
strlen((char *)argvars[1].vval.v_string));
Job *job = job_find(argvars[0].vval.v_number);
if (!res) {
if (!job) {
// Invalid job id
EMSG(_(e_invjob));
}
rettv->vval.v_number = 1;
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
strlen((char *)argvars[1].vval.v_string),
free);
rettv->vval.v_number = job_write(job, buf);
}
/*
@ -12550,7 +12556,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv)
if (!channel_send_event((uint64_t)argvars[0].vval.v_number,
(char *)argvars[1].vval.v_string,
&argvars[2])) {
vim_to_object(&argvars[2]))) {
EMSG2(_(e_invarg2), "Channel doesn't exist");
return;
}

View File

@ -758,7 +758,7 @@ getcmdline (
*/
switch (c) {
case K_EVENT:
event_process();
event_process(true);
// Force a redraw even though the command line didn't change
shell_resized();
goto cmdline_not_changed;
@ -1873,8 +1873,8 @@ redraw:
}
if (IS_SPECIAL(c1)) {
// Process pending events
event_process();
// Process deferred events
event_process(true);
// Ignore other special key codes
continue;
}

View File

@ -53,39 +53,39 @@ int main() {
#define kv_roundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
#define kvec_t(type) struct { size_t n, m; type *a; }
#define kv_init(v) ((v).n = (v).m = 0, (v).a = 0)
#define kv_destroy(v) free((v).a)
#define kv_A(v, i) ((v).a[(i)])
#define kv_pop(v) ((v).a[--(v).n])
#define kv_size(v) ((v).n)
#define kv_max(v) ((v).m)
#define kvec_t(type) struct { size_t size, capacity; type *items; }
#define kv_init(v) ((v).size = (v).capacity = 0, (v).items = 0)
#define kv_destroy(v) free((v).items)
#define kv_A(v, i) ((v).items[(i)])
#define kv_pop(v) ((v).items[--(v).size])
#define kv_size(v) ((v).size)
#define kv_max(v) ((v).capacity)
#define kv_resize(type, v, s) ((v).m = (s), (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m))
#define kv_resize(type, v, s) ((v).capacity = (s), (v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity))
#define kv_copy(type, v1, v0) do { \
if ((v1).m < (v0).n) kv_resize(type, v1, (v0).n); \
(v1).n = (v0).n; \
memcpy((v1).a, (v0).a, sizeof(type) * (v0).n); \
if ((v1).capacity < (v0).size) kv_resize(type, v1, (v0).size); \
(v1).size = (v0).size; \
memcpy((v1).items, (v0).items, sizeof(type) * (v0).size); \
} while (0) \
#define kv_push(type, v, x) do { \
if ((v).n == (v).m) { \
(v).m = (v).m? (v).m<<1 : 2; \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m); \
if ((v).size == (v).capacity) { \
(v).capacity = (v).capacity? (v).capacity<<1 : 8; \
(v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity); \
} \
(v).a[(v).n++] = (x); \
(v).items[(v).size++] = (x); \
} while (0)
#define kv_pushp(type, v) (((v).n == (v).m)? \
((v).m = ((v).m? (v).m<<1 : 2), \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
: 0), ((v).a + ((v).n++))
#define kv_pushp(type, v) (((v).size == (v).capacity)? \
((v).capacity = ((v).capacity? (v).capacity<<1 : 8), \
(v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity), 0) \
: 0), ((v).items + ((v).size++))
#define kv_a(type, v, i) (((v).m <= (size_t)(i)? \
((v).m = (v).n = (i) + 1, kv_roundup32((v).m), \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
: (v).n <= (size_t)(i)? (v).n = (i) + 1 \
: 0), (v).a[(i)])
#define kv_a(type, v, i) (((v).capacity <= (size_t)(i)? \
((v).capacity = (v).size = (i) + 1, kv_roundup32((v).capacity), \
(v).items = (type*)xrealloc((v).items, sizeof(type) * (v).capacity), 0) \
: (v).size <= (size_t)(i)? (v).size = (i) + 1 \
: 0), (v).items[(i)])
#endif

View File

@ -2064,7 +2064,7 @@ static int do_more_prompt(int typed_char)
toscroll = 0;
switch (c) {
case K_EVENT:
event_process();
event_process(true);
break;
case BS: /* scroll one line back */
case K_BS:

View File

@ -7368,5 +7368,5 @@ static void nv_cursorhold(cmdarg_T *cap)
static void nv_event(cmdarg_T *cap)
{
event_process();
event_process(true);
}

View File

@ -24,7 +24,7 @@ typedef struct {
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
union {
int job_id;
Job *job;
struct {
RStream *read;
WStream *write;
@ -68,11 +68,26 @@ void channel_teardown()
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
void channel_from_job(char **argv)
bool channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL);
int status;
channel->data.job = job_start(argv,
channel,
job_out,
job_err,
job_exit,
true,
&status);
if (status <= 0) {
close_channel(channel);
return false;
}
return true;
}
/// Creates an API channel from a libuv stream representing a tcp or
@ -101,12 +116,13 @@ void channel_from_stream(uv_stream_t *stream)
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @return True if the data was sent successfully, false otherwise.
bool channel_send_event(uint64_t id, char *type, typval_T *data)
bool channel_send_event(uint64_t id, char *type, Object data)
{
Channel *channel = NULL;
if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id))) {
msgpack_rpc_free_object(data);
return false;
}
send_event(channel, type, data);
@ -126,7 +142,7 @@ void channel_subscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
return;
abort();
}
char *event_string = pmap_get(cstr_t)(event_strings, event);
@ -148,7 +164,7 @@ void channel_unsubscribe(uint64_t id, char *event)
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id))) {
return;
abort();
}
unsubscribe(channel, event);
@ -165,6 +181,11 @@ static void job_err(RStream *rstream, void *data, bool eof)
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
static void job_exit(Job *job, void *data)
{
// TODO(tarruda): what should be done here?
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
Channel *channel = data;
@ -183,30 +204,57 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked);
UnpackResult result;
msgpack_packer response;
// Deserialize everything we can.
while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
== kUnpackResultOk) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(channel->sbuffer->data,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
true));
free));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
if (result == kUnpackResultFail) {
// See src/msgpack/unpack_template.h in msgpack source tree for
// causes for this error(search for 'goto _failed')
//
// A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when it's internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
msgpack_pack_array(&response, 4);
msgpack_pack_int(&response, 1);
msgpack_pack_int(&response, 0);
msgpack_rpc_error("Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting",
&response);
wstream_write(channel->data.streams.write,
wstream_new_buffer(xmemdup(channel->sbuffer->data,
channel->sbuffer->size),
channel->sbuffer->size,
free));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
}
static void send_event(Channel *channel, char *type, typval_T *data)
static void send_event(Channel *channel, char *type, Object data)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
}
static void broadcast_event(char *type, typval_T *data)
static void broadcast_event(char *type, Object data)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
@ -219,6 +267,7 @@ static void broadcast_event(char *type, typval_T *data)
});
if (!kv_size(subscribed)) {
msgpack_rpc_free_object(data);
goto end;
}
@ -255,7 +304,9 @@ static void close_channel(Channel *channel)
msgpack_unpacker_free(channel->unpacker);
if (channel->is_job) {
job_stop(channel->data.job_id);
if (channel->data.job) {
job_stop(channel->data.job);
}
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
@ -278,17 +329,17 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
static WBuffer *serialize_event(char *type, typval_T *data)
static WBuffer *serialize_event(char *type, Object data)
{
String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, event_data, &packer);
WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data,
msgpack_rpc_notification(event_type, data, &packer);
WBuffer *rv = wstream_new_buffer(xmemdup(msgpack_event_buffer.data,
msgpack_event_buffer.size),
msgpack_event_buffer.size,
true);
msgpack_rpc_free_object(event_data);
free);
msgpack_rpc_free_object(data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return rv;

View File

@ -2,8 +2,8 @@
#define NVIM_OS_CHANNEL_H
#include <uv.h>
#include <msgpack.h>
#include "nvim/api/private/defs.h"
#include "nvim/vim.h"
#define EVENT_MAXLEN 512

View File

@ -21,17 +21,22 @@
#define _destroy_event(x) // do nothing
KLIST_INIT(Event, Event, _destroy_event)
typedef struct {
bool timed_out;
int32_t ms;
uv_timer_t *timer;
} TimerData;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *event_queue;
static uv_timer_t timer;
static uv_prepare_t timer_prepare;
static klist_t(Event) *deferred_events, *immediate_events;
void event_init()
{
// Initialize the event queue
event_queue = kl_init(Event);
// Initialize the event queues
deferred_events = kl_init(Event);
immediate_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@ -44,9 +49,6 @@ void event_init()
channel_init();
// Servers
server_init();
uv_timer_init(uv_default_loop(), &timer);
// This prepare handle that actually starts the timer
uv_prepare_init(uv_default_loop(), &timer_prepare);
}
void event_teardown()
@ -59,7 +61,6 @@ void event_teardown()
// Wait for some event
bool event_poll(int32_t ms)
{
bool timed_out;
uv_run_mode run_mode = UV_RUN_ONCE;
if (input_ready()) {
@ -67,15 +68,26 @@ bool event_poll(int32_t ms)
return true;
}
input_start();
timed_out = false;
static int recursive = 0;
if (!(recursive++)) {
// Only needs to start the libuv handle the first time we enter here
input_start();
}
uv_timer_t timer;
uv_prepare_t timer_prepare;
TimerData timer_data = {.ms = ms, .timed_out = false, .timer = &timer};
if (ms > 0) {
uv_timer_init(uv_default_loop(), &timer);
// This prepare handle that actually starts the timer
uv_prepare_init(uv_default_loop(), &timer_prepare);
// Timeout passed as argument to the timer
timer.data = &timed_out;
timer.data = &timer_data;
// We only start the timer after the loop is running, for that we
// use a prepare handle(pass the interval as data to it)
timer_prepare.data = &ms;
timer_prepare.data = &timer_data;
uv_prepare_start(&timer_prepare, timer_prepare_cb);
} else if (ms == 0) {
// For ms == 0, we need to do a non-blocking event poll by
@ -87,40 +99,51 @@ bool event_poll(int32_t ms)
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
uv_run(uv_default_loop(), run_mode);
// Process immediate events outside uv_run since libuv event loop not
// support recursion(processing events may cause a recursive event_poll
// call)
event_process(false);
} while (
// Continue running if ...
!input_ready() && // we have no input
kl_empty(event_queue) && // no events are waiting to be processed
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timed_out); // we didn't get a timeout
!timer_data.timed_out); // we didn't get a timeout
input_stop();
if (ms > 0) {
// Stop the timer
uv_timer_stop(&timer);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
input_stop();
}
return input_ready() || event_is_pending();
if (ms > 0) {
// Ensure the timer-related handles are closed and run the event loop
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
event_process(false);
}
return input_ready() || event_has_deferred();
}
bool event_is_pending()
bool event_has_deferred()
{
return !kl_empty(event_queue);
return !kl_empty(get_queue(true));
}
// Push an event to the queue
void event_push(Event event)
void event_push(Event event, bool deferred)
{
*kl_pushp(Event, event_queue) = event;
*kl_pushp(Event, get_queue(deferred)) = event;
}
// Runs the appropriate action for each queued event
void event_process()
void event_process(bool deferred)
{
Event event;
while (kl_shift(Event, event_queue, &event) == 0) {
while (kl_shift(Event, get_queue(deferred), &event) == 0) {
switch (event.type) {
case kEventSignal:
signal_handle(event);
@ -140,11 +163,19 @@ void event_process()
// Set a flag in the `event_poll` loop for signaling of a timeout
static void timer_cb(uv_timer_t *handle)
{
*((bool *)handle->data) = true;
TimerData *data = handle->data;
data->timed_out = true;
}
static void timer_prepare_cb(uv_prepare_t *handle)
{
uv_timer_start(&timer, timer_cb, *(uint32_t *)timer_prepare.data, 0);
uv_prepare_stop(&timer_prepare);
TimerData *data = handle->data;
assert(data->ms > 0);
uv_timer_start(data->timer, timer_cb, (uint32_t)data->ms, 0);
uv_prepare_stop(handle);
}
static klist_t(Event) *get_queue(bool deferred)
{
return deferred ? deferred_events : immediate_events;
}

View File

@ -67,7 +67,7 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
{
InbufPollResult result;
if (event_is_pending()) {
if (event_has_deferred()) {
// Return pending event bytes
return push_event_key(buf, maxlen);
}
@ -91,8 +91,8 @@ int os_inchar(uint8_t *buf, int maxlen, int32_t ms, int tb_change_cnt)
}
}
// If there are pending events, return the keys directly
if (event_is_pending()) {
// If there are deferred events, return the keys directly
if (event_has_deferred()) {
return push_event_key(buf, maxlen);
}

View File

@ -37,6 +37,7 @@ struct job {
int pending_closes;
// If the job was already stopped
bool stopped;
bool defer;
// Data associated with the job
void *data;
// Callbacks
@ -128,14 +129,18 @@ void job_teardown()
/// @param stderr_cb Callback that will be invoked when data is available
/// on stderr
/// @param exit_cb Callback that will be invoked when the job exits
/// @return The job id if the job started successfully. If the the first item /
/// of `argv`(the program) could not be executed, -1 will be returned.
// 0 will be returned if the job table is full.
int job_start(char **argv,
void *data,
rstream_cb stdout_cb,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb)
/// @param defer If the job callbacks invocation should be deferred to vim
/// main loop
/// @param[out] The job id if the job started successfully, 0 if the job table
/// is full, -1 if the program could not be executed.
/// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(char **argv,
void *data,
rstream_cb stdout_cb,
rstream_cb stderr_cb,
job_exit_cb job_exit_cb,
bool defer,
int *status)
{
int i;
Job *job;
@ -149,12 +154,14 @@ int job_start(char **argv,
if (i == MAX_RUNNING_JOBS) {
// No free slots
return 0;
*status = 0;
return NULL;
}
job = xmalloc(sizeof(Job));
// Initialize
job->id = i + 1;
*status = job->id;
job->pending_refs = 3;
job->pending_closes = 4;
job->data = data;
@ -175,6 +182,7 @@ int job_start(char **argv,
job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL;
job->defer = defer;
// Initialize the job std{in,out,err}
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
@ -192,7 +200,8 @@ int job_start(char **argv,
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
free_job(job);
return -1;
*status = -1;
return NULL;
}
// Give all handles a reference to the job
@ -204,8 +213,8 @@ int job_start(char **argv,
job->in = wstream_new(JOB_WRITE_MAXMEM);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@ -219,77 +228,64 @@ int job_start(char **argv,
}
job_count++;
return job->id;
return job;
}
/// Finds a job instance by id
///
/// @param id The job id
/// @return the Job instance
Job *job_find(int id)
{
Job *job;
if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
|| job->stopped) {
return NULL;
}
return job;
}
/// Terminates a job. This is a non-blocking operation, but if the job exists
/// it's guaranteed to succeed(SIGKILL will eventually be sent)
///
/// @param id The job id
/// @return true if the stop request was successfully sent, false if the job
/// id is invalid(probably because it has already stopped)
bool job_stop(int id)
/// @param job The Job instance
void job_stop(Job *job)
{
Job *job = find_job(id);
if (job == NULL || job->stopped) {
return false;
}
job->stopped = true;
return true;
}
/// Writes data to the job's stdin. This is a non-blocking operation, it
/// returns when the write request was sent.
///
/// @param id The job id
/// @param data Buffer containing the data to be written
/// @param len Size of the data
/// @return true if the write request was successfully sent, false if the job
/// id is invalid(probably because it has already stopped)
bool job_write(int id, char *data, uint32_t len)
/// @param job The Job instance
/// @param buffer The buffer which contains the data to be written
/// @return true if the write request was successfully sent, false if writing
/// to the job stream failed (possibly because the OS buffer is full)
bool job_write(Job *job, WBuffer *buffer)
{
Job *job = find_job(id);
if (job == NULL || job->stopped) {
free(data);
return false;
}
if (!wstream_write(job->in, wstream_new_buffer(data, len, false))) {
job_stop(job->id);
return false;
}
return true;
return wstream_write(job->in, buffer);
}
/// Sets the `defer` flag for a Job instance
///
/// @param rstream The Job id
/// @param defer The new value for the flag
void job_set_defer(Job *job, bool defer)
{
job->defer = defer;
rstream_set_defer(job->out, defer);
rstream_set_defer(job->err, defer);
}
/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
void job_exit_event(Event event)
{
Job *job = event.data.job;
// Free the slot now, 'exit_cb' may want to start another job to replace
// this one
table[job->id - 1] = NULL;
if (job->exit_cb) {
// Invoke the exit callback
job->exit_cb(job, job->data);
}
// Free the job resources
free_job(job);
// Stop polling job status if this was the last
job_count--;
if (job_count == 0) {
uv_prepare_stop(&job_prepare);
}
job_exit_callback(event.data.job);
}
/// Get the job id
@ -310,20 +306,32 @@ void *job_data(Job *job)
return job->data;
}
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
// this one
table[job->id - 1] = NULL;
if (job->exit_cb) {
// Invoke the exit callback
job->exit_cb(job, job->data);
}
// Free the job resources
free_job(job);
// Stop polling job status if this was the last
job_count--;
if (job_count == 0) {
uv_prepare_stop(&job_prepare);
}
}
static bool is_alive(Job *job)
{
return uv_process_kill(&job->proc, 0) == 0;
}
static Job * find_job(int id)
{
if (id <= 0 || id > MAX_RUNNING_JOBS) {
return NULL;
}
return table[id - 1];
}
static void free_job(Job *job)
{
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
@ -385,7 +393,7 @@ static void emit_exit_event(Job *job)
Event event;
event.type = kEventJobExit;
event.data.job = job;
event_push(event);
event_push(event, true);
}
static void close_cb(uv_handle_t *handle)

View File

@ -12,6 +12,8 @@
#include "nvim/os/rstream_defs.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/wstream.h"
#include "nvim/os/wstream_defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/job.h.generated.h"

View File

@ -79,11 +79,13 @@ void msgpack_rpc_call(uint64_t id, msgpack_object *req, msgpack_packer *res)
"Request array size is %u, it should be 4",
req->via.array.size);
msgpack_rpc_error(error_msg, res);
return;
}
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
msgpack_pack_int(res, 0); // no message id yet
msgpack_rpc_error("Id must be a positive integer", res);
return;
}
// Set the response id, which is the same as the request
@ -398,6 +400,31 @@ void msgpack_rpc_free_dictionary(Dictionary value)
free(value.items);
}
UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
msgpack_unpacked* result)
{
if (result->zone != NULL) {
msgpack_zone_free(result->zone);
}
int res = msgpack_unpacker_execute(unpacker);
if (res > 0) {
result->zone = msgpack_unpacker_release_zone(unpacker);
result->data = msgpack_unpacker_data(unpacker);
msgpack_unpacker_reset(unpacker);
return kUnpackResultOk;
}
if (res < 0) {
// Since we couldn't parse it, destroy the data consumed so far
msgpack_unpacker_reset(unpacker);
return kUnpackResultFail;
}
return kUnpackResultNeedMore;
}
REMOTE_FUNCS_IMPL(Buffer, buffer)
REMOTE_FUNCS_IMPL(Window, window)
REMOTE_FUNCS_IMPL(Tabpage, tabpage)

View File

@ -9,6 +9,12 @@
#include "nvim/func_attr.h"
#include "nvim/api/private/defs.h"
typedef enum {
kUnpackResultOk, /// Successfully parsed a document
kUnpackResultFail, /// Got unexpected input
kUnpackResultNeedMore /// Need more data
} UnpackResult;
/// Validates the basic structure of the msgpack-rpc call and fills `res`
/// with the basic response structure.
///
@ -40,6 +46,19 @@ void msgpack_rpc_dispatch(uint64_t id,
msgpack_packer *res)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
/// Try to unpack a msgpack document from the data in the unpacker buffer. This
/// function is a replacement to msgpack.h `msgpack_unpack_next` that lets
/// the called know if the unpacking failed due to bad input or due to missing
/// data.
///
/// @param unpacker The unpacker containing the parse buffer
/// @param result The result which will contain the parsed object
/// @return kUnpackResultOk : An object was parsed
/// kUnpackResultFail : Got bad input
/// kUnpackResultNeedMore: Need more data
UnpackResult msgpack_rpc_unpack(msgpack_unpacker* unpacker,
msgpack_unpacked* result);
/// Finishes the msgpack-rpc call with an error message.
///
/// @param msg The error message

View File

@ -24,7 +24,7 @@ struct rstream {
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
bool reading, free_handle, async;
bool reading, free_handle, defer;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@ -38,21 +38,19 @@ struct rstream {
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
/// @param async Flag that specifies if the callback should only be called
/// outside libuv event loop(When processing async events with
/// KE_EVENT). Only the RStream instance reading user input should set
/// this to false
/// @param defer Flag that specifies if callback invocation should be deferred
/// to vim main loop(as a KE_EVENT special key)
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
bool async)
bool defer)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
rv->async = async;
rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
@ -213,6 +211,15 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
/// Sets the `defer` flag for a a RStream instance
///
/// @param rstream The RStream instance
/// @param defer The new value for the flag
void rstream_set_defer(RStream *rstream, bool defer)
{
rstream->defer = defer;
}
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
@ -333,16 +340,9 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof)
{
if (rstream->async) {
Event event;
event.type = kEventRStreamData;
event.data.rstream.ptr = rstream;
event.data.rstream.eof = eof;
event_push(event);
} else {
// Invoke the callback passing in the number of bytes available and data
// associated with the stream
rstream->cb(rstream, rstream->data, eof);
}
Event event;
event.type = kEventRStreamData;
event.data.rstream.ptr = rstream;
event.data.rstream.eof = eof;
event_push(event, rstream->defer);
}

View File

@ -159,5 +159,5 @@ static void signal_cb(uv_signal_t *handle, int signum)
.signum = signum
}
};
event_push(event);
event_push(event, true);
}

View File

@ -21,8 +21,9 @@ struct wstream {
};
struct wbuffer {
size_t refcount, size;
size_t size, refcount;
char *data;
wbuffer_data_finalizer cb;
};
typedef struct {
@ -90,7 +91,7 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
// This should not be called after a wstream was freed
assert(!wstream->freed);
if (wstream->curmem + buffer->size > wstream->maxmem) {
if (wstream->curmem > wstream->maxmem) {
return false;
}
@ -116,19 +117,16 @@ bool wstream_write(WStream *wstream, WBuffer *buffer)
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
/// @param copy If true, the data will be copied into the WBuffer
/// @param cb Pointer to function that will be responsible for freeing
/// the buffer data(passing 'free' will work as expected).
/// @return The allocated WBuffer instance
WBuffer *wstream_new_buffer(char *data, size_t size, bool copy)
WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
rv->refcount = 0;
if (copy) {
rv->data = xmemdup(data, size);
} else {
rv->data = data;
}
rv->cb = cb;
rv->data = data;
return rv;
}
@ -141,8 +139,7 @@ static void write_cb(uv_write_t *req, int status)
data->wstream->curmem -= data->buffer->size;
if (!--data->buffer->refcount) {
// Free the data written to the stream
free(data->buffer->data);
data->buffer->cb(data->buffer->data);
free(data->buffer);
}

View File

@ -3,6 +3,7 @@
typedef struct wbuffer WBuffer;
typedef struct wstream WStream;
typedef void (*wbuffer_data_finalizer)(void *data);
#endif // NVIM_OS_WSTREAM_DEFS_H