mirror of
https://github.com/neovim/neovim.git
synced 2025-02-25 18:55:25 -06:00
eval: Fix jobwait() to process multiple jobs concurrently
The new event processing architecture changed `jobwait()` semantics: Only one job is processed at time since process_wait only focuses on one queue. This fixes the problem with a few changes: - Allow the event queue polled by `process_wait` to be overriden by a new argument. - Allow the parent queue to be overriden with `queue_replace_parent` - Create a temporary queue that serves as the parent for all jobs passed to `jobwait()`
This commit is contained in:
parent
a816c726bb
commit
f1de097dbb
@ -11037,6 +11037,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
|
|||||||
list_T *rv = list_alloc();
|
list_T *rv = list_alloc();
|
||||||
|
|
||||||
ui_busy_start();
|
ui_busy_start();
|
||||||
|
Queue *waiting_jobs = queue_new_parent(loop_on_put, &loop);
|
||||||
// For each item in the input list append an integer to the output list. -3
|
// For each item in the input list append an integer to the output list. -3
|
||||||
// is used to represent an invalid job id, -2 is for a interrupted job and
|
// is used to represent an invalid job id, -2 is for a interrupted job and
|
||||||
// -1 for jobs that were skipped or timed out.
|
// -1 for jobs that were skipped or timed out.
|
||||||
@ -11050,6 +11051,10 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
|
|||||||
// status code when the job exits
|
// status code when the job exits
|
||||||
list_append_number(rv, -1);
|
list_append_number(rv, -1);
|
||||||
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
|
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
|
||||||
|
// Process any pending events for the job because we'll temporarily
|
||||||
|
// replace the parent queue
|
||||||
|
queue_process_events(data->events);
|
||||||
|
queue_replace_parent(data->events, waiting_jobs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -11070,7 +11075,7 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
|
|||||||
|| !(data = find_job(arg->li_tv.vval.v_number))) {
|
|| !(data = find_job(arg->li_tv.vval.v_number))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int status = process_wait((Process *)&data->proc, remaining);
|
int status = process_wait((Process *)&data->proc, remaining, waiting_jobs);
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
// interrupted or timed out, skip remaining jobs.
|
// interrupted or timed out, skip remaining jobs.
|
||||||
if (status == -2) {
|
if (status == -2) {
|
||||||
@ -11090,9 +11095,6 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// poll to ensure any pending callbacks from the last job are invoked
|
|
||||||
loop_poll_events(&loop, 0);
|
|
||||||
|
|
||||||
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
||||||
TerminalJobData *data = NULL;
|
TerminalJobData *data = NULL;
|
||||||
if (arg->li_tv.v_type != VAR_NUMBER
|
if (arg->li_tv.v_type != VAR_NUMBER
|
||||||
@ -11103,8 +11105,21 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv)
|
|||||||
// job exits
|
// job exits
|
||||||
data->status_ptr = NULL;
|
data->status_ptr = NULL;
|
||||||
}
|
}
|
||||||
ui_busy_stop();
|
|
||||||
|
|
||||||
|
// restore the parent queue for any jobs still alive
|
||||||
|
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
|
||||||
|
TerminalJobData *data = NULL;
|
||||||
|
if (arg->li_tv.v_type != VAR_NUMBER
|
||||||
|
|| !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// restore the parent queue for the job
|
||||||
|
queue_process_events(data->events);
|
||||||
|
queue_replace_parent(data->events, loop.events);
|
||||||
|
}
|
||||||
|
|
||||||
|
queue_free(waiting_jobs);
|
||||||
|
ui_busy_stop();
|
||||||
rv->lv_refcount++;
|
rv->lv_refcount++;
|
||||||
rettv->v_type = VAR_LIST;
|
rettv->v_type = VAR_LIST;
|
||||||
rettv->vval.v_list = rv;
|
rettv->vval.v_list = rv;
|
||||||
|
@ -22,7 +22,7 @@ void loop_init(Loop *loop, void *data)
|
|||||||
loop->uv.data = loop;
|
loop->uv.data = loop;
|
||||||
loop->children = kl_init(WatcherPtr);
|
loop->children = kl_init(WatcherPtr);
|
||||||
loop->children_stop_requests = 0;
|
loop->children_stop_requests = 0;
|
||||||
loop->events = queue_new_parent(on_put, loop);
|
loop->events = queue_new_parent(loop_on_put, loop);
|
||||||
loop->fast_events = queue_new_child(loop->events);
|
loop->fast_events = queue_new_child(loop->events);
|
||||||
uv_signal_init(&loop->uv, &loop->children_watcher);
|
uv_signal_init(&loop->uv, &loop->children_watcher);
|
||||||
uv_timer_init(&loop->uv, &loop->children_kill_timer);
|
uv_timer_init(&loop->uv, &loop->children_kill_timer);
|
||||||
@ -59,7 +59,7 @@ void loop_poll_events(Loop *loop, int ms)
|
|||||||
queue_process_events(loop->fast_events);
|
queue_process_events(loop->fast_events);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void on_put(Queue *queue, void *data)
|
void loop_on_put(Queue *queue, void *data)
|
||||||
{
|
{
|
||||||
Loop *loop = data;
|
Loop *loop = data;
|
||||||
// Sometimes libuv will run pending callbacks(timer for example) before
|
// Sometimes libuv will run pending callbacks(timer for example) before
|
||||||
|
@ -152,7 +152,7 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
|
|||||||
/// indistinguishable from the process returning -1 by itself. Which
|
/// indistinguishable from the process returning -1 by itself. Which
|
||||||
/// is possible on some OS. Returns -2 if an user has interruped the
|
/// is possible on some OS. Returns -2 if an user has interruped the
|
||||||
/// wait.
|
/// wait.
|
||||||
int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
|
int process_wait(Process *proc, int ms, Queue *events) FUNC_ATTR_NONNULL_ARG(1)
|
||||||
{
|
{
|
||||||
// The default status is -1, which represents a timeout
|
// The default status is -1, which represents a timeout
|
||||||
int status = -1;
|
int status = -1;
|
||||||
@ -162,10 +162,14 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
return proc->status;
|
return proc->status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!events) {
|
||||||
|
events = proc->events;
|
||||||
|
}
|
||||||
|
|
||||||
// Increase refcount to stop the exit callback from being called(and possibly
|
// Increase refcount to stop the exit callback from being called(and possibly
|
||||||
// being freed) before we have a chance to get the status.
|
// being freed) before we have a chance to get the status.
|
||||||
proc->refcount++;
|
proc->refcount++;
|
||||||
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, ms,
|
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, ms,
|
||||||
// Until...
|
// Until...
|
||||||
got_int || // interrupted by the user
|
got_int || // interrupted by the user
|
||||||
proc->refcount == 1); // job exited
|
proc->refcount == 1); // job exited
|
||||||
@ -179,10 +183,10 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
if (ms == -1) {
|
if (ms == -1) {
|
||||||
// We can only return if all streams/handles are closed and the job
|
// We can only return if all streams/handles are closed and the job
|
||||||
// exited.
|
// exited.
|
||||||
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, proc->events, -1,
|
LOOP_PROCESS_EVENTS_UNTIL(proc->loop, events, -1,
|
||||||
proc->refcount == 1);
|
proc->refcount == 1);
|
||||||
} else {
|
} else {
|
||||||
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
|
LOOP_PROCESS_EVENTS(proc->loop, events, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,9 +195,9 @@ int process_wait(Process *proc, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
// resources
|
// resources
|
||||||
status = interrupted ? -2 : proc->status;
|
status = interrupted ? -2 : proc->status;
|
||||||
decref(proc);
|
decref(proc);
|
||||||
if (proc->events) {
|
if (events) {
|
||||||
// the decref call created an exit event, process it now
|
// the decref call created an exit event, process it now
|
||||||
queue_process_events(proc->events);
|
queue_process_events(events);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
proc->refcount--;
|
proc->refcount--;
|
||||||
|
@ -152,6 +152,12 @@ bool queue_empty(Queue *queue)
|
|||||||
return QUEUE_EMPTY(&queue->headtail);
|
return QUEUE_EMPTY(&queue->headtail);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void queue_replace_parent(Queue *queue, Queue *new_parent)
|
||||||
|
{
|
||||||
|
assert(queue_empty(queue));
|
||||||
|
queue->parent = new_parent;
|
||||||
|
}
|
||||||
|
|
||||||
static Event queue_remove(Queue *queue)
|
static Event queue_remove(Queue *queue)
|
||||||
{
|
{
|
||||||
assert(!queue_empty(queue));
|
assert(!queue_empty(queue));
|
||||||
|
@ -257,7 +257,7 @@ static int do_os_system(char **argv,
|
|||||||
// the UI
|
// the UI
|
||||||
ui_busy_start();
|
ui_busy_start();
|
||||||
ui_flush();
|
ui_flush();
|
||||||
int status = process_wait(proc, -1);
|
int status = process_wait(proc, -1, NULL);
|
||||||
ui_busy_stop();
|
ui_busy_stop();
|
||||||
|
|
||||||
// prepare the out parameters if requested
|
// prepare the out parameters if requested
|
||||||
|
Loading…
Reference in New Issue
Block a user