eval: Implement jobwait() vimscript function

This commit is contained in:
Thiago de Arruda 2015-03-28 11:30:23 -03:00
parent 6e7757ad51
commit 028f6d7d3f
5 changed files with 232 additions and 6 deletions

View File

@ -4061,6 +4061,24 @@ jobstop({job}) {Nvim} *jobstop()*
`v:job_data[0]` set to `exited`. See |job-control| for more `v:job_data[0]` set to `exited`. See |job-control| for more
information. information.
jobwait({ids}[, {timeout}]) {Nvim} *jobwait()*
Wait for a set of jobs to finish. The {ids} argument is a list
of ids for jobs that will be waited for. If passed, {timeout}
is the maximum number of milliseconds to wait. While this
function is executing, callbacks for jobs not in the {ids}
list can be executed. Also, the screen wont be updated unless
|:redraw| is invoked by one of the callbacks.
Returns a list of integers with the same length as {ids}, with
each integer representing the wait result for the
corresponding job id. The possible values for the resulting
integers are:
* the job return code if the job exited
* -1 if the wait timed out for the job
* -2 if the job was interrupted
* -3 if the job id is invalid.
join({list} [, {sep}]) *join()* join({list} [, {sep}]) *join()*
Join the items in {list} together into one String. Join the items in {list} together into one String.
When {sep} is specified it is put in between the items. If When {sep} is specified it is put in between the items. If

View File

@ -448,6 +448,7 @@ typedef struct {
int refcount; int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit; ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self; dict_T *self;
int *status_ptr;
} TerminalJobData; } TerminalJobData;
@ -470,6 +471,7 @@ typedef struct {
#define JobEventFreer(x) #define JobEventFreer(x)
KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer) KMEMPOOL_INIT(JobEventPool, JobEvent, JobEventFreer)
static kmempool_t(JobEventPool) *job_event_pool = NULL; static kmempool_t(JobEventPool) *job_event_pool = NULL;
static bool defer_job_callbacks = true;
/* /*
* Initialize the global and v: variables. * Initialize the global and v: variables.
@ -6537,6 +6539,7 @@ static struct fst {
{"jobsend", 2, 2, f_jobsend}, {"jobsend", 2, 2, f_jobsend},
{"jobstart", 1, 2, f_jobstart}, {"jobstart", 1, 2, f_jobstart},
{"jobstop", 1, 1, f_jobstop}, {"jobstop", 1, 1, f_jobstop},
{"jobwait", 1, 2, f_jobwait},
{"join", 1, 2, f_join}, {"join", 1, 2, f_join},
{"keys", 1, 1, f_keys}, {"keys", 1, 1, f_keys},
{"last_buffer_nr", 0, 0, f_last_buffer_nr}, /* obsolete */ {"last_buffer_nr", 0, 0, f_last_buffer_nr}, /* obsolete */
@ -10841,6 +10844,105 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv)
rettv->vval.v_number = 1; rettv->vval.v_number = 1;
} }
// "jobwait(ids[, timeout])" function
static void f_jobwait(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_LIST || (argvars[1].v_type != VAR_NUMBER
&& argvars[1].v_type != VAR_UNKNOWN)) {
EMSG(_(e_invarg));
return;
}
list_T *args = argvars[0].vval.v_list;
list_T *rv = list_alloc();
// must temporarily disable job event deferring so the callbacks are
// processed while waiting.
defer_job_callbacks = false;
// For each item in the input list append an integer to the output list. -3
// is used to represent an invalid job id, -2 is for a interrupted job and
// -1 for jobs that were skipped or timed out.
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
list_append_number(rv, -3);
} else {
TerminalJobData *data = job_data(job);
// append the list item and set the status pointer so we'll collect the
// status code when the job exits
list_append_number(rv, -1);
data->status_ptr = &rv->lv_last->li_tv.vval.v_number;
}
}
int remaining = -1;
uint64_t before = 0;
if (argvars[1].v_type == VAR_NUMBER && argvars[1].vval.v_number >= 0) {
remaining = argvars[1].vval.v_number;
before = os_hrtime();
}
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
if (remaining == 0) {
// timed out
break;
}
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
continue;
}
TerminalJobData *data = job_data(job);
int status = job_wait(job, remaining);
if (status < 0) {
// interrupted or timed out, skip remaining jobs.
if (status == -2) {
// set the status so the user can distinguish between interrupted and
// skipped/timeout jobs.
*data->status_ptr = -2;
}
break;
}
if (remaining > 0) {
uint64_t now = os_hrtime();
remaining -= (int) ((now - before) / 1000000);
before = now;
if (remaining <= 0) {
break;
}
}
}
for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) {
Job *job = NULL;
if (arg->li_tv.v_type != VAR_NUMBER
|| !(job = job_find(arg->li_tv.vval.v_number))
|| !is_user_job(job)) {
continue;
}
TerminalJobData *data = job_data(job);
// remove the status pointer because the list may be freed before the
// job exits
data->status_ptr = NULL;
}
// restore defer flag
defer_job_callbacks = true;
rv->lv_refcount++;
rettv->v_type = VAR_LIST;
rettv->vval.v_list = rv;
}
/* /*
* "join()" function * "join()" function
*/ */
@ -19951,6 +20053,16 @@ static inline void free_term_job_data(TerminalJobData *data) {
free(data); free(data);
} }
static inline bool is_user_job(Job *job)
{
if (!job) {
return false;
}
JobOptions *opts = job_opts(job);
return opts->exit_cb == on_job_exit;
}
// vimscript job callbacks must be executed on Nvim main loop // vimscript job callbacks must be executed on Nvim main loop
static inline void push_job_event(Job *job, ufunc_T *callback, static inline void push_job_event(Job *job, ufunc_T *callback,
const char *type, char *data, size_t count, int status) const char *type, char *data, size_t count, int status)
@ -19990,7 +20102,7 @@ static inline void push_job_event(Job *job, ufunc_T *callback,
event_push((Event) { event_push((Event) {
.handler = on_job_event, .handler = on_job_event,
.data = event_data .data = event_data
}, true); }, defer_job_callbacks);
} }
static void on_job_stdout(RStream *rstream, void *job, bool eof) static void on_job_stdout(RStream *rstream, void *job, bool eof)
@ -20039,6 +20151,10 @@ static void on_job_exit(Job *job, int status, void *d)
_("\r\n[Program exited, press any key to close]")); _("\r\n[Program exited, press any key to close]"));
} }
if (data->status_ptr) {
*data->status_ptr = status;
}
push_job_event(job, data->on_exit, "exit", NULL, 0, status); push_job_event(job, data->on_exit, "exit", NULL, 0, status);
} }

View File

@ -24,7 +24,6 @@
// before we send SIGNAL to it // before we send SIGNAL to it
#define TERM_TIMEOUT 1000000000 #define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2) #define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF #define JOB_BUFFER_SIZE 0xFFFF
#define close_job_stream(job, stream, type) \ #define close_job_stream(job, stream, type) \
@ -234,11 +233,12 @@ void job_stop(Job *job)
/// @return returns the status code of the exited job. -1 if the job is /// @return returns the status code of the exited job. -1 if the job is
/// still running and the `timeout` has expired. Note that this is /// still running and the `timeout` has expired. Note that this is
/// indistinguishable from the process returning -1 by itself. Which /// indistinguishable from the process returning -1 by itself. Which
/// is possible on some OS. /// is possible on some OS. Returns -2 if the job was interrupted.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{ {
// The default status is -1, which represents a timeout // The default status is -1, which represents a timeout
int status = -1; int status = -1;
bool interrupted = false;
// Increase refcount to stop the job from being freed before we have a // Increase refcount to stop the job from being freed before we have a
// chance to get the status. // chance to get the status.
@ -251,6 +251,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
// we'll assume that a user frantically hitting interrupt doesn't like // we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed. // the current job. Signal that it has to be killed.
if (got_int) { if (got_int) {
interrupted = true;
got_int = false; got_int = false;
job_stop(job); job_stop(job);
if (ms == -1) { if (ms == -1) {
@ -265,7 +266,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
if (job->refcount == 1) { if (job->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job // Job exited, collect status and manually invoke close_cb to free the job
// resources // resources
status = job->status; status = interrupted ? -2 : job->status;
job_close_streams(job); job_close_streams(job);
job_decref(job); job_decref(job);
} else { } else {
@ -357,6 +358,11 @@ void job_close_streams(Job *job)
close_job_err(job); close_job_err(job);
} }
JobOptions *job_opts(Job *job)
{
return &job->opts;
}
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0). /// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_stop_timer_cb(uv_timer_t *handle) static void job_stop_timer_cb(uv_timer_t *handle)

View File

@ -5,6 +5,7 @@
#include "nvim/os/rstream_defs.h" #include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream_defs.h" #include "nvim/os/wstream_defs.h"
#define MAX_RUNNING_JOBS 100
typedef struct job Job; typedef struct job Job;
/// Function called when the job reads data /// Function called when the job reads data

View File

@ -4,8 +4,8 @@ local clear, nvim, eq, neq, ok, expect, eval, next_msg, run, stop, session
= helpers.clear, helpers.nvim, helpers.eq, helpers.neq, helpers.ok, = helpers.clear, helpers.nvim, helpers.eq, helpers.neq, helpers.ok,
helpers.expect, helpers.eval, helpers.next_message, helpers.run, helpers.expect, helpers.eval, helpers.next_message, helpers.run,
helpers.stop, helpers.session helpers.stop, helpers.session
local nvim_dir, insert = helpers.nvim_dir, helpers.insert local nvim_dir, insert, feed = helpers.nvim_dir, helpers.insert, helpers.feed
local source = helpers.source local source, execute, wait = helpers.source, helpers.execute, helpers.wait
describe('jobs', function() describe('jobs', function()
@ -170,6 +170,91 @@ describe('jobs', function()
eq({'notification', 'exit', {45, 10}}, next_msg()) eq({'notification', 'exit', {45, 10}}, next_msg())
end) end)
describe('jobwait', function()
it('returns a list of status codes', function()
source([[
call rpcnotify(g:channel, 'wait', jobwait([
\ jobstart([&sh, '-c', 'sleep 0.10; exit 4']),
\ jobstart([&sh, '-c', 'sleep 0.110; exit 5']),
\ jobstart([&sh, '-c', 'sleep 0.210; exit 6']),
\ jobstart([&sh, '-c', 'sleep 0.310; exit 7'])
\ ]))
]])
eq({'notification', 'wait', {{4, 5, 6, 7}}}, next_msg())
end)
it('will run callbacks while waiting', function()
source([[
let g:dict = {'id': 10}
let g:l = []
function g:dict.on_stdout(id, data)
call add(g:l, a:data[0])
endfunction
call jobwait([
\ jobstart([&sh, '-c', 'sleep 0.010; echo 4'], g:dict),
\ jobstart([&sh, '-c', 'sleep 0.030; echo 5'], g:dict),
\ jobstart([&sh, '-c', 'sleep 0.050; echo 6'], g:dict),
\ jobstart([&sh, '-c', 'sleep 0.070; echo 7'], g:dict)
\ ])
call rpcnotify(g:channel, 'wait', g:l)
]])
eq({'notification', 'wait', {{'4', '5', '6', '7'}}}, next_msg())
end)
it('will return status codes in the order of passed ids', function()
source([[
call rpcnotify(g:channel, 'wait', jobwait([
\ jobstart([&sh, '-c', 'sleep 0.070; exit 4']),
\ jobstart([&sh, '-c', 'sleep 0.050; exit 5']),
\ jobstart([&sh, '-c', 'sleep 0.030; exit 6']),
\ jobstart([&sh, '-c', 'sleep 0.010; exit 7'])
\ ]))
]])
eq({'notification', 'wait', {{4, 5, 6, 7}}}, next_msg())
end)
it('will return -3 for invalid job ids', function()
source([[
call rpcnotify(g:channel, 'wait', jobwait([
\ -10,
\ jobstart([&sh, '-c', 'sleep 0.01; exit 5']),
\ ]))
]])
eq({'notification', 'wait', {{-3, 5}}}, next_msg())
end)
it('will return -2 when interrupted', function()
execute('call rpcnotify(g:channel, "ready") | '..
'call rpcnotify(g:channel, "wait", '..
'jobwait([jobstart([&sh, "-c", "sleep 10; exit 55"])]))')
eq({'notification', 'ready', {}}, next_msg())
feed('<c-c>')
eq({'notification', 'wait', {{-2}}}, next_msg())
end)
describe('with timeout argument', function()
it('will return -1 if the wait timed out', function()
source([[
call rpcnotify(g:channel, 'wait', jobwait([
\ jobstart([&sh, '-c', 'sleep 0.05; exit 4']),
\ jobstart([&sh, '-c', 'sleep 0.3; exit 5']),
\ ], 100))
]])
eq({'notification', 'wait', {{4, -1}}}, next_msg())
end)
it('can pass 0 to check if a job exists', function()
source([[
call rpcnotify(g:channel, 'wait', jobwait([
\ jobstart([&sh, '-c', 'sleep 0.05; exit 4']),
\ jobstart([&sh, '-c', 'sleep 0.3; exit 5']),
\ ], 0))
]])
eq({'notification', 'wait', {{-1, -1}}}, next_msg())
end)
end)
end)
-- FIXME need to wait until jobsend succeeds before calling jobstop -- FIXME need to wait until jobsend succeeds before calling jobstop
pending('will only emit the "exit" event after "stdout" and "stderr"', function() pending('will only emit the "exit" event after "stdout" and "stderr"', function()
nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)") nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)")