mirror of
https://github.com/neovim/neovim.git
synced 2025-02-25 18:55:25 -06:00
channels: refactor jobwait
This commit is contained in:
parent
5517d2323b
commit
f629f8312d
@ -639,12 +639,12 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
|
|||||||
terminal_close(chan->term, msg);
|
terminal_close(chan->term, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chan->status_ptr) {
|
// if status is -1 the process did not really exit,
|
||||||
*chan->status_ptr = status;
|
// we just closed the handle onto a detached process
|
||||||
|
if (status >= 0) {
|
||||||
|
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
|
|
||||||
|
|
||||||
channel_decref(chan);
|
channel_decref(chan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,8 +74,6 @@ struct Channel {
|
|||||||
CallbackReader on_stdout;
|
CallbackReader on_stdout;
|
||||||
CallbackReader on_stderr;
|
CallbackReader on_stderr;
|
||||||
Callback on_exit;
|
Callback on_exit;
|
||||||
|
|
||||||
varnumber_T *status_ptr; // TODO: refactor?
|
|
||||||
};
|
};
|
||||||
|
|
||||||
EXTERN PMap(uint64_t) *channels;
|
EXTERN PMap(uint64_t) *channels;
|
||||||
|
@ -11693,28 +11693,31 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
list_T *args = argvars[0].vval.v_list;
|
list_T *args = argvars[0].vval.v_list;
|
||||||
list_T *rv = tv_list_alloc();
|
Channel **jobs = xcalloc(args->lv_len, sizeof(*jobs));
|
||||||
|
|
||||||
ui_busy_start();
|
ui_busy_start();
|
||||||
MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop);
|
MultiQueue *waiting_jobs = multiqueue_new_parent(loop_on_put, &main_loop);
|
||||||
// For each item in the input list append an integer to the output list. -3
|
// For each item in the input list append an integer to the output list. -3
|
||||||
// is used to represent an invalid job id, -2 is for a interrupted job and
|
// is used to represent an invalid job id, -2 is for a interrupted job and
|
||||||
// -1 for jobs that were skipped or timed out.
|
// -1 for jobs that were skipped or timed out.
|
||||||
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
|
||||||
Channel *data = NULL;
|
int i = 0;
|
||||||
|
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next, i++) {
|
||||||
|
Channel *chan = NULL;
|
||||||
if (arg->li_tv.v_type != VAR_NUMBER
|
if (arg->li_tv.v_type != VAR_NUMBER
|
||||||
|| !(data = find_job(arg->li_tv.vval.v_number, false))) {
|
|| !(chan = find_job(arg->li_tv.vval.v_number, false))) {
|
||||||
tv_list_append_number(rv, -3);
|
jobs[i] = NULL;
|
||||||
} else {
|
} else {
|
||||||
// append the list item and set the status pointer so we'll collect the
|
jobs[i] = chan;
|
||||||
// status code when the job exits
|
channel_incref(chan);
|
||||||
tv_list_append_number(rv, -1);
|
if (chan->stream.proc.status < 0) {
|
||||||
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
|
// Process any pending events for the job because we'll temporarily
|
||||||
// Process any pending events for the job because we'll temporarily
|
// replace the parent queue
|
||||||
// replace the parent queue
|
multiqueue_process_events(chan->events);
|
||||||
multiqueue_process_events(data->events);
|
multiqueue_replace_parent(chan->events, waiting_jobs);
|
||||||
multiqueue_replace_parent(data->events, waiting_jobs);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -11725,25 +11728,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
|
|||||||
before = os_hrtime();
|
before = os_hrtime();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
for (i = 0; i < args->lv_len; i++) {
|
||||||
Channel *data = NULL;
|
|
||||||
if (remaining == 0) {
|
if (remaining == 0) {
|
||||||
// timed out
|
// timed out
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (arg->li_tv.v_type != VAR_NUMBER
|
|
||||||
|| !(data = find_job(arg->li_tv.vval.v_number, false))) {
|
// if the job already exited, but wasn't freed yet
|
||||||
|
if (jobs[i] == NULL || jobs[i]->stream.proc.status >= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int status = process_wait((Process *)&data->stream.proc, remaining,
|
|
||||||
|
int status = process_wait(&jobs[i]->stream.proc, remaining,
|
||||||
waiting_jobs);
|
waiting_jobs);
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
// interrupted or timed out, skip remaining jobs.
|
// interrupted or timed out, skip remaining jobs.
|
||||||
if (status == -2) {
|
|
||||||
// set the status so the user can distinguish between interrupted and
|
|
||||||
// skipped/timeout jobs.
|
|
||||||
*data->status_ptr = -2;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (remaining > 0) {
|
if (remaining > 0) {
|
||||||
@ -11756,30 +11755,24 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
list_T *rv = tv_list_alloc();
|
||||||
Channel *data = NULL;
|
|
||||||
if (arg->li_tv.v_type != VAR_NUMBER
|
|
||||||
|| !(data = find_job(arg->li_tv.vval.v_number, false))) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// remove the status pointer because the list may be freed before the
|
|
||||||
// job exits
|
|
||||||
data->status_ptr = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// restore the parent queue for any jobs still alive
|
// restore the parent queue for any jobs still alive
|
||||||
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
for (i = 0; i < args->lv_len; i++) {
|
||||||
Channel *data = NULL;
|
if (jobs[i] == NULL) {
|
||||||
if (arg->li_tv.v_type != VAR_NUMBER
|
tv_list_append_number(rv, -3);
|
||||||
|| !(data = find_job(arg->li_tv.vval.v_number, false))) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// restore the parent queue for the job
|
// restore the parent queue for the job
|
||||||
multiqueue_process_events(data->events);
|
multiqueue_process_events(jobs[i]->events);
|
||||||
multiqueue_replace_parent(data->events, main_loop.events);
|
multiqueue_replace_parent(jobs[i]->events, main_loop.events);
|
||||||
|
|
||||||
|
tv_list_append_number(rv, jobs[i]->stream.proc.status);
|
||||||
|
channel_decref(jobs[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
multiqueue_free(waiting_jobs);
|
multiqueue_free(waiting_jobs);
|
||||||
|
xfree(jobs);
|
||||||
ui_busy_stop();
|
ui_busy_stop();
|
||||||
rv->lv_refcount++;
|
rv->lv_refcount++;
|
||||||
rettv->v_type = VAR_LIST;
|
rettv->v_type = VAR_LIST;
|
||||||
|
@ -145,16 +145,15 @@ void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
|
|||||||
/// @param process Process instance
|
/// @param process Process instance
|
||||||
/// @param ms Time in milliseconds to wait for the process.
|
/// @param ms Time in milliseconds to wait for the process.
|
||||||
/// 0 for no wait. -1 to wait until the process quits.
|
/// 0 for no wait. -1 to wait until the process quits.
|
||||||
/// @return Exit code of the process.
|
/// @return Exit code of the process. proc->status will have the same value.
|
||||||
/// -1 if the timeout expired while the process is still running.
|
/// -1 if the timeout expired while the process is still running.
|
||||||
/// -2 if the user interruped the wait.
|
/// -2 if the user interruped the wait.
|
||||||
int process_wait(Process *proc, int ms, MultiQueue *events)
|
int process_wait(Process *proc, int ms, MultiQueue *events)
|
||||||
FUNC_ATTR_NONNULL_ARG(1)
|
FUNC_ATTR_NONNULL_ARG(1)
|
||||||
{
|
{
|
||||||
int status = -1; // default
|
|
||||||
bool interrupted = false;
|
bool interrupted = false;
|
||||||
if (!proc->refcount) {
|
if (!proc->refcount) {
|
||||||
status = proc->status;
|
int status = proc->status;
|
||||||
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
|
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
@ -190,7 +189,9 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
|
|||||||
if (proc->refcount == 1) {
|
if (proc->refcount == 1) {
|
||||||
// Job exited, collect status and manually invoke close_cb to free the job
|
// Job exited, collect status and manually invoke close_cb to free the job
|
||||||
// resources
|
// resources
|
||||||
status = interrupted ? -2 : proc->status;
|
if (interrupted) {
|
||||||
|
proc->status = -2;
|
||||||
|
}
|
||||||
decref(proc);
|
decref(proc);
|
||||||
if (events) {
|
if (events) {
|
||||||
// the decref call created an exit event, process it now
|
// the decref call created an exit event, process it now
|
||||||
@ -200,7 +201,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
|
|||||||
proc->refcount--;
|
proc->refcount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
return status;
|
return proc->status;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ask a process to terminate and eventually kill if it doesn't respond
|
/// Ask a process to terminate and eventually kill if it doesn't respond
|
||||||
|
@ -39,7 +39,7 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
|
|||||||
.loop = loop,
|
.loop = loop,
|
||||||
.events = NULL,
|
.events = NULL,
|
||||||
.pid = 0,
|
.pid = 0,
|
||||||
.status = 0,
|
.status = -1,
|
||||||
.refcount = 0,
|
.refcount = 0,
|
||||||
.stopped_time = 0,
|
.stopped_time = 0,
|
||||||
.cwd = NULL,
|
.cwd = NULL,
|
||||||
|
Loading…
Reference in New Issue
Block a user