channels: stderr channel

This commit is contained in:
Björn Linse 2017-06-09 08:40:24 +02:00
parent 90e5cc5484
commit 5af4703177
7 changed files with 120 additions and 56 deletions

View File

@ -14,6 +14,12 @@
static bool did_stdio = false; static bool did_stdio = false;
PMap(uint64_t) *channels = NULL; PMap(uint64_t) *channels = NULL;
/// next free id for a job or rpc channel
/// 1 is reserved for stdio channel
/// 2 is reserved for stderr channel
static uint64_t next_chan_id = CHAN_STDERR+1;
typedef struct { typedef struct {
Channel *data; Channel *data;
Callback *callback; Callback *callback;
@ -73,7 +79,6 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
} }
} else if ((part == kChannelPartStdin || part == kChannelPartStdout) } else if ((part == kChannelPartStdin || part == kChannelPartStdout)
&& chan->is_rpc) { && chan->is_rpc) {
// EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
*error = (const char *)e_invstreamrpc; *error = (const char *)e_invstreamrpc;
return false; return false;
} }
@ -117,6 +122,21 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
} }
break; break;
case kChannelStreamStderr:
if (part != kChannelPartAll && part != kChannelPartStderr) {
*error = (const char *)e_invstream;
return false;
}
if (!chan->stream.err.closed) {
chan->stream.err.closed = true;
// Don't close on exit, in case late error messages
if (!exiting) {
fclose(stderr);
}
channel_decref(chan);
}
break;
case kChannelStreamInternal: case kChannelStreamInternal:
if (!close_main) { if (!close_main) {
*error = (const char *)e_invstream; *error = (const char *)e_invstream;
@ -132,6 +152,7 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
void channel_init(void) void channel_init(void)
{ {
channels = pmap_new(uint64_t)(); channels = pmap_new(uint64_t)();
channel_alloc(kChannelStreamStderr);
rpc_init(); rpc_init();
remote_ui_init(); remote_ui_init();
} }
@ -143,7 +164,13 @@ void channel_init(void)
static Channel *channel_alloc(ChannelStreamType type) static Channel *channel_alloc(ChannelStreamType type)
{ {
Channel *chan = xcalloc(1, sizeof(*chan)); Channel *chan = xcalloc(1, sizeof(*chan));
chan->id = type == kChannelStreamStdio ? 1 : next_chan_id++; if (type == kChannelStreamStdio) {
chan->id = CHAN_STDIO;
} else if (type == kChannelStreamStderr) {
chan->id = CHAN_STDERR;
} else {
chan->id = next_chan_id++;
}
chan->events = multiqueue_new_child(main_loop.events); chan->events = multiqueue_new_child(main_loop.events);
chan->refcount = 1; chan->refcount = 1;
chan->streamtype = type; chan->streamtype = type;
@ -403,6 +430,46 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
return channel->id; return channel->id;
} }
/// @param data will be consumed
size_t channel_send(uint64_t id, char *data, size_t len, const char **error)
{
Channel *chan = find_channel(id);
if (!chan) {
EMSG(_(e_invchan));
goto err;
}
if (chan->streamtype == kChannelStreamStderr) {
if (chan->stream.err.closed) {
*error = _("Can't send data to closed stream");
goto err;
}
// unbuffered write
size_t written = fwrite(data, len, 1, stderr);
xfree(data);
return len * written;
}
Stream *in = channel_instream(chan);
if (in->closed) {
*error = _("Can't send data to closed stream");
goto err;
}
if (chan->is_rpc) {
*error = _("Can't send raw data to rpc channel");
goto err;
}
WBuffer *buf = wstream_new_buffer(data, len, 1, xfree);
return wstream_write(in, buf) ? len : 0;
err:
xfree(data);
return 0;
}
// vimscript job callbacks must be executed on Nvim main loop // vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback, static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf, const char *type, char *buf,

View File

@ -9,10 +9,14 @@
#include "nvim/eval/typval.h" #include "nvim/eval/typval.h"
#include "nvim/msgpack_rpc/channel_defs.h" #include "nvim/msgpack_rpc/channel_defs.h"
#define CHAN_STDIO 1
#define CHAN_STDERR 2
typedef enum { typedef enum {
kChannelStreamProc, kChannelStreamProc,
kChannelStreamSocket, kChannelStreamSocket,
kChannelStreamStdio, kChannelStreamStdio,
kChannelStreamStderr,
kChannelStreamInternal kChannelStreamInternal
} ChannelStreamType; } ChannelStreamType;
@ -30,6 +34,10 @@ typedef struct {
Stream out; Stream out;
} StdioPair; } StdioPair;
typedef struct {
bool closed;
} StderrState;
typedef struct { typedef struct {
Callback cb; Callback cb;
garray_T buffer; garray_T buffer;
@ -56,6 +64,7 @@ struct Channel {
PtyProcess pty; PtyProcess pty;
Stream socket; Stream socket;
StdioPair stdio; StdioPair stdio;
StderrState err;
} stream; } stream;
bool is_rpc; bool is_rpc;
@ -95,6 +104,7 @@ static inline Stream *channel_instream(Channel *chan)
return &chan->stream.stdio.out; return &chan->stream.stdio.out;
case kChannelStreamInternal: case kChannelStreamInternal:
case kChannelStreamStderr:
abort(); abort();
} }
abort(); abort();
@ -114,6 +124,7 @@ static inline Stream *channel_outstream(Channel *chan)
return &chan->stream.stdio.in; return &chan->stream.stdio.in;
case kChannelStreamInternal: case kChannelStreamInternal:
case kChannelStreamStderr:
abort(); abort();
} }
abort(); abort();

View File

@ -366,6 +366,7 @@ static struct vimvar {
VV(VV_DYING, "dying", VAR_NUMBER, VV_RO), VV(VV_DYING, "dying", VAR_NUMBER, VV_RO),
VV(VV_EXCEPTION, "exception", VAR_STRING, VV_RO), VV(VV_EXCEPTION, "exception", VAR_STRING, VV_RO),
VV(VV_THROWPOINT, "throwpoint", VAR_STRING, VV_RO), VV(VV_THROWPOINT, "throwpoint", VAR_STRING, VV_RO),
VV(VV_STDERR, "stderr", VAR_NUMBER, VV_RO),
VV(VV_REG, "register", VAR_STRING, VV_RO), VV(VV_REG, "register", VAR_STRING, VV_RO),
VV(VV_CMDBANG, "cmdbang", VAR_NUMBER, VV_RO), VV(VV_CMDBANG, "cmdbang", VAR_NUMBER, VV_RO),
VV(VV_INSERTMODE, "insertmode", VAR_STRING, VV_RO), VV(VV_INSERTMODE, "insertmode", VAR_STRING, VV_RO),
@ -586,6 +587,7 @@ void eval_init(void)
v_event->dv_lock = VAR_FIXED; v_event->dv_lock = VAR_FIXED;
set_vim_var_dict(VV_EVENT, v_event); set_vim_var_dict(VV_EVENT, v_event);
set_vim_var_list(VV_ERRORS, tv_list_alloc()); set_vim_var_list(VV_ERRORS, tv_list_alloc());
set_vim_var_nr(VV_STDERR, CHAN_STDERR);
set_vim_var_nr(VV_SEARCHFORWARD, 1L); set_vim_var_nr(VV_SEARCHFORWARD, 1L);
set_vim_var_nr(VV_HLSEARCH, 1L); set_vim_var_nr(VV_HLSEARCH, 1L);
set_vim_var_nr(VV_COUNT1, 1); set_vim_var_nr(VV_COUNT1, 1);
@ -7361,6 +7363,37 @@ static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
} }
} }
// "chansend(id, data)" function
static void f_chansend(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) {
// First argument is the channel id and second is the data to write
EMSG(_(e_invarg));
return;
}
ptrdiff_t input_len = 0;
char *input = save_tv_as_string(&argvars[1], &input_len, false);
if (!input) {
// Either the error has been handled by save_tv_as_string(),
// or there is no input to send.
return;
}
uint64_t id = argvars[0].vval.v_number;
const char *error = NULL;
rettv->vval.v_number = channel_send(id, input, input_len, &error);
if (error) {
EMSG(error);
}
}
/* /*
* "char2nr(string)" function * "char2nr(string)" function
*/ */
@ -11454,52 +11487,6 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)
rettv->vval.v_number = proc->pid; rettv->vval.v_number = proc->pid;
} }
// "jobsend()" function
static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER || argvars[1].v_type == VAR_UNKNOWN) {
// First argument is the job id and second is the string or list to write
// to the job's stdin
EMSG(_(e_invarg));
return;
}
Channel *data = find_channel(argvars[0].vval.v_number);
if (!data) {
EMSG(_(e_invchan));
return;
}
Stream *in = channel_instream(data);
if (in->closed) {
EMSG(_("Can't send data to the job: stdin is closed"));
return;
}
if (data->is_rpc) {
EMSG(_("Can't send raw data to rpc channel"));
return;
}
ptrdiff_t input_len = 0;
char *input = save_tv_as_string(&argvars[1], &input_len, false);
if (!input) {
// Either the error has been handled by save_tv_as_string(), or there is no
// input to send.
return;
}
WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree);
rettv->vval.v_number = wstream_write(in, buf);
}
// "jobresize(job, width, height)" function // "jobresize(job, width, height)" function
static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{ {

View File

@ -56,6 +56,7 @@ typedef enum {
VV_DYING, VV_DYING,
VV_EXCEPTION, VV_EXCEPTION,
VV_THROWPOINT, VV_THROWPOINT,
VV_STDERR,
VV_REG, VV_REG,
VV_CMDBANG, VV_CMDBANG,
VV_INSERTMODE, VV_INSERTMODE,

View File

@ -56,6 +56,7 @@ return {
ceil={args=1, func="float_op_wrapper", data="&ceil"}, ceil={args=1, func="float_op_wrapper", data="&ceil"},
changenr={}, changenr={},
chanclose={args={1, 2}}, chanclose={args={1, 2}},
chansend={args=2},
char2nr={args={1, 2}}, char2nr={args={1, 2}},
cindent={args=1}, cindent={args=1},
clearmatches={}, clearmatches={},
@ -177,7 +178,7 @@ return {
jobclose={args={1, 2}, func="f_chanclose"}, jobclose={args={1, 2}, func="f_chanclose"},
jobpid={args=1}, jobpid={args=1},
jobresize={args=3}, jobresize={args=3},
jobsend={args=2}, jobsend={args=2, func="f_chansend"},
jobstart={args={1, 2}}, jobstart={args={1, 2}},
jobstop={args=1}, jobstop={args=1},
jobwait={args={1, 2}}, jobwait={args={1, 2}},

View File

@ -1199,10 +1199,6 @@ EXTERN bool embedded_mode INIT(= false);
// or read/write to stdio (unless embedding) // or read/write to stdio (unless embedding)
EXTERN bool headless_mode INIT(= false); EXTERN bool headless_mode INIT(= false);
/// next free id for a job or rpc channel
/// 1 is reserved for stdio channel
EXTERN uint64_t next_chan_id INIT(= 2);
/// Used to track the status of external functions. /// Used to track the status of external functions.
/// Currently only used for iconv(). /// Currently only used for iconv().
typedef enum { typedef enum {

View File

@ -297,7 +297,7 @@ int main(int argc, char **argv)
assert(p_ch >= 0 && Rows >= p_ch && Rows - p_ch <= INT_MAX); assert(p_ch >= 0 && Rows >= p_ch && Rows - p_ch <= INT_MAX);
cmdline_row = (int)(Rows - p_ch); cmdline_row = (int)(Rows - p_ch);
msg_row = cmdline_row; msg_row = cmdline_row;
screenalloc(false); /* allocate screen buffers */ screenalloc(false); // allocate screen buffers
set_init_2(headless_mode); set_init_2(headless_mode);
TIME_MSG("inits 2"); TIME_MSG("inits 2");
@ -310,8 +310,9 @@ int main(int argc, char **argv)
/* Set the break level after the terminal is initialized. */ /* Set the break level after the terminal is initialized. */
debug_break_level = params.use_debug_break_level; debug_break_level = params.use_debug_break_level;
bool reading_input = !headless_mode && (params.input_isatty bool reading_input = !headless_mode
|| params.output_isatty || params.err_isatty); && (params.input_isatty || params.output_isatty
|| params.err_isatty);
if (reading_input) { if (reading_input) {
// One of the startup commands (arguments, sourced scripts or plugins) may // One of the startup commands (arguments, sourced scripts or plugins) may