channels: generalize jobclose()

This commit is contained in:
Björn Linse 2017-06-08 17:15:53 +02:00
parent 3e59c1e20d
commit 90e5cc5484
9 changed files with 173 additions and 156 deletions

View File

@ -35,10 +35,99 @@ void channel_teardown(void)
Channel *channel;
map_foreach_value(channels, channel, {
(void)channel; // close_channel(channel);
channel_close(channel->id, kChannelPartAll, NULL);
});
}
/// Closes a channel
///
/// @param id The channel id
/// @return true if successful, false otherwise
bool channel_close(uint64_t id, ChannelPart part, const char **error)
{
Channel *chan;
Process *proc;
const char *dummy;
if (!error) {
error = &dummy;
}
if (!(chan = find_channel(id))) {
if (id < next_chan_id) {
// allow double close, even though we can't say what parts was valid.
return true;
}
*error = (const char *)e_invchan;
return false;
}
bool close_main = false;
if (part == kChannelPartRpc || part == kChannelPartAll) {
close_main = true;
if (chan->is_rpc) {
rpc_close(chan);
} else if (part == kChannelPartRpc) {
*error = (const char *)e_invstream;
return false;
}
} else if ((part == kChannelPartStdin || part == kChannelPartStdout)
&& chan->is_rpc) {
// EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
*error = (const char *)e_invstreamrpc;
return false;
}
switch (chan->streamtype) {
case kChannelStreamSocket:
if (!close_main) {
*error = (const char *)e_invstream;
return false;
}
stream_may_close(&chan->stream.socket);
break;
case kChannelStreamProc:
proc = (Process *)&chan->stream.proc;
if (part == kChannelPartStdin || close_main) {
stream_may_close(&proc->in);
}
if (part == kChannelPartStdout || close_main) {
stream_may_close(&proc->out);
}
if (part == kChannelPartStderr || part == kChannelPartAll) {
stream_may_close(&proc->err);
}
if (proc->type == kProcessTypePty && part == kChannelPartAll) {
pty_process_close_master(&chan->stream.pty);
}
break;
case kChannelStreamStdio:
if (part == kChannelPartStdin || close_main) {
stream_may_close(&chan->stream.stdio.in);
}
if (part == kChannelPartStdout || close_main) {
stream_may_close(&chan->stream.stdio.out);
}
if (part == kChannelPartStderr) {
*error = (const char *)e_invstream;
return false;
}
break;
case kChannelStreamInternal:
if (!close_main) {
*error = (const char *)e_invstream;
return false;
}
break;
}
return true;
}
/// Initializes the module
void channel_init(void)
{
@ -239,7 +328,6 @@ uint64_t channel_connect(bool tcp, const char *address,
return 0;
}
channel_incref(channel); // close channel only after the stream is closed
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
@ -264,7 +352,6 @@ void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket);
channel_incref(channel); // close channel only after the stream is closed
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
@ -277,7 +364,6 @@ void channel_from_connection(SocketWatcher *watcher)
static uint64_t channel_create_internal_rpc(void)
{
Channel *channel = channel_alloc(kChannelStreamInternal);
channel_incref(channel); // internal channel lives until process exit
rpc_start(channel);
return channel->id;
}
@ -427,10 +513,6 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
terminal_close(chan->term, msg);
}
if (chan->is_rpc) {
channel_process_exit(chan->id, status);
}
if (chan->status_ptr) {
*chan->status_ptr = status;
}

View File

@ -16,6 +16,15 @@ typedef enum {
kChannelStreamInternal
} ChannelStreamType;
typedef enum {
kChannelPartStdin,
kChannelPartStdout,
kChannelPartStderr,
kChannelPartRpc,
kChannelPartAll
} ChannelPart;
typedef struct {
Stream in;
Stream out;

View File

@ -7322,6 +7322,45 @@ static void f_changenr(typval_T *argvars, typval_T *rettv, FunPtr fptr)
rettv->vval.v_number = curbuf->b_u_seq_cur;
}
// "chanclose(id[, stream])" function
static void f_chanclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
&& argvars[1].v_type != VAR_UNKNOWN)) {
EMSG(_(e_invarg));
return;
}
ChannelPart part = kChannelPartAll;
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
part = kChannelPartStdin;
} else if (!strcmp(stream, "stdout")) {
part = kChannelPartStdout;
} else if (!strcmp(stream, "stderr")) {
part = kChannelPartStderr;
} else if (!strcmp(stream, "rpc")) {
part = kChannelPartRpc;
} else {
EMSG2(_("Invalid channel stream \"%s\""), stream);
return;
}
}
const char *error;
rettv->vval.v_number = channel_close(argvars[0].vval.v_number, part, &error);
if (!rettv->vval.v_number) {
EMSG(error);
}
}
/*
* "char2nr(string)" function
*/
@ -11391,67 +11430,6 @@ static void f_items(typval_T *argvars, typval_T *rettv, FunPtr fptr)
dict_list(argvars, rettv, 2);
}
// "jobclose(id[, stream])" function
static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER || (argvars[1].v_type != VAR_STRING
&& argvars[1].v_type != VAR_UNKNOWN)) {
EMSG(_(e_invarg));
return;
}
Channel *data = find_job(argvars[0].vval.v_number, true);
if (!data) {
return;
}
Process *proc = (Process *)&data->stream.proc;
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
if (data->is_rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_in(proc);
}
} else if (!strcmp(stream, "stdout")) {
if (data->is_rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_out(proc);
}
} else if (!strcmp(stream, "stderr")) {
process_close_err(proc);
} else if (!strcmp(stream, "rpc")) {
if (data->is_rpc) {
channel_close(data->id);
} else {
EMSG(_("Invalid job stream: Not an rpc job"));
}
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
if (data->is_rpc) {
channel_close(data->id);
process_close_err(proc);
} else {
process_close_streams(proc);
if (proc->type == kProcessTypePty) {
pty_process_close_master(&data->stream.pty);
}
}
}
}
// "jobpid(id)" function
static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr)
{
@ -13933,7 +13911,12 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr)
if (find_job(id, false)) {
f_jobstop(argvars, rettv, NULL);
} else {
rettv->vval.v_number = channel_close(id);
const char *error;
rettv->vval.v_number = channel_close(argvars[0].vval.v_number,
kChannelPartRpc, &error);
if (!rettv->vval.v_number) {
EMSG(error);
}
}
}

View File

@ -55,6 +55,7 @@ return {
call={args={2, 3}},
ceil={args=1, func="float_op_wrapper", data="&ceil"},
changenr={},
chanclose={args={1, 2}},
char2nr={args={1, 2}},
cindent={args=1},
clearmatches={},
@ -173,7 +174,7 @@ return {
islocked={args=1},
id={args=1},
items={args=1},
jobclose={args={1, 2}},
jobclose={args={1, 2}, func="f_chanclose"},
jobpid={args=1},
jobresize={args=3},
jobsend={args=2},

View File

@ -25,13 +25,6 @@
// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
#define KILL_TIMEOUT_MS 2000
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (!proc->stream.closed) { \
stream_close(&proc->stream, NULL, NULL); \
} \
} while (0)
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
@ -140,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
process_close_in(proc);
process_close_out(proc);
process_close_err(proc);
}
void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, in);
}
void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, out);
}
void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, err);
stream_may_close(&proc->in);
stream_may_close(&proc->out);
stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@ -237,8 +214,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits (voluntarily or not).
process_close_in(proc);
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
stream_may_close(&proc->in);
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);
break;

View File

@ -92,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
void stream_may_close(Stream *stream)
{
if (!stream->closed) {
stream_close(stream, NULL, NULL);
}
}
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{

View File

@ -1082,6 +1082,9 @@ EXTERN char_u e_jobspawn[] INIT(= N_(
EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty"));
EXTERN char_u e_stdiochan2[] INIT(= N_(
"E905: Couldn't open stdio channel: %s"));
EXTERN char_u e_invstream[] INIT(= N_("E906: invalid stream for channel"));
EXTERN char_u e_invstreamrpc[] INIT(= N_(
"E906: invalid stream for rpc channel, use 'rpc'"));
EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\""));
EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s"));
EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number"));

View File

@ -54,6 +54,7 @@ void rpc_init(void)
void rpc_start(Channel *channel)
{
channel_incref(channel);
channel->is_rpc = true;
RpcState *rpc = &channel->rpc;
rpc->closed = false;
@ -204,31 +205,6 @@ void rpc_unsubscribe(uint64_t id, char *event)
unsubscribe(channel, event);
}
/// Closes a channel
///
/// @param id The channel id
/// @return true if successful, false otherwise
bool channel_close(uint64_t id)
{
Channel *channel;
if (!(channel = find_rpc_channel(id))) {
return false;
}
close_channel(channel);
return true;
}
void channel_process_exit(uint64_t id, int status)
{
Channel *channel = pmap_get(uint64_t)(channels, id);
// channel_decref(channel); remove??
channel->rpc.closed = true;
}
// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
@ -236,7 +212,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
channel_incref(channel);
if (eof) {
close_channel(channel);
channel_close(channel->id, kChannelPartRpc, NULL);
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
@ -540,43 +516,25 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
/// Close the channel streams/process and free the channel resources.
/// TODO: move to channel.h
static void close_channel(Channel *channel)
/// Mark rpc state as closed, and release its reference to the channel.
/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
void rpc_close(Channel *channel)
{
if (channel->rpc.closed) {
return;
}
channel->rpc.closed = true;
switch (channel->streamtype) {
case kChannelStreamSocket:
stream_close(&channel->stream.socket, NULL, NULL);
break;
case kChannelStreamProc:
// Only close the rpc channel part,
// there could be an error message on the stderr stream
process_close_in(&channel->stream.proc);
process_close_out(&channel->stream.proc);
break;
case kChannelStreamStdio:
stream_close(&channel->stream.stdio.in, NULL, NULL);
stream_close(&channel->stream.stdio.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
case kChannelStreamInternal:
// nothing to free.
break;
}
channel_decref(channel);
if (channel->streamtype == kChannelStreamStdio) {
multiqueue_put(main_loop.fast_events, exit_event, 0);
}
}
static void exit_event(void **argv)
{
channel_decref(argv[0]);
if (!exiting) {
mch_exit(0);
}
@ -642,7 +600,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
frame->result = STRING_OBJ(cstr_to_string(msg));
}
close_channel(channel);
channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,

View File

@ -677,10 +677,6 @@ static void shell_write_cb(Stream *stream, void *data, int status)
msg_schedule_emsgf(_("E5677: Error writing input to shell-command: %s"),
uv_err_name(status));
}
if (stream->closed) { // Process may have exited before this write.
WLOG("stream was already closed");
return;
}
stream_close(stream, NULL, NULL);
}