Refactor job control module to use RStream class

This commit is contained in:
Thiago de Arruda 2014-04-15 15:34:16 -03:00
parent 001d05541b
commit 6e4e40a0f7
5 changed files with 63 additions and 85 deletions

View File

@ -65,6 +65,8 @@
#include "os/os.h" #include "os/os.h"
#include "os/job.h" #include "os/job.h"
#include "os/shell.h" #include "os/shell.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
#if defined(FEAT_FLOAT) #if defined(FEAT_FLOAT)
# include <math.h> # include <math.h>
@ -406,8 +408,7 @@ static dictitem_T vimvars_var; /* variable used for v: */
static void apply_job_autocmds(int id, static void apply_job_autocmds(int id,
void *data, void *data,
char *buffer, RStream *target,
uint32_t len,
bool from_stdout); bool from_stdout);
static void prepare_vimvar(int idx, typval_T *save_tv); static void prepare_vimvar(int idx, typval_T *save_tv);
static void restore_vimvar(int idx, typval_T *save_tv); static void restore_vimvar(int idx, typval_T *save_tv);
@ -19798,18 +19799,29 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, char_u *flags)
static void apply_job_autocmds(int id, static void apply_job_autocmds(int id,
void *data, void *data,
char *buffer, RStream *target,
uint32_t len,
bool from_stdout) bool from_stdout)
{ {
list_T *list; list_T *list;
char *str;
listitem_T *str_slot = listitem_alloc();
uint32_t read_count = rstream_available(target);
// Call JobActivity autocommands // Prepare the list item containing the data read
str = xmalloc(read_count + 1);
rstream_read(target, str, read_count);
str[read_count] = NUL;
str_slot->li_tv.v_type = VAR_STRING;
str_slot->li_tv.v_lock = 0;
str_slot->li_tv.vval.v_string = (char_u *)str;
// Create the list which will be set to v:job_data
list = list_alloc(); list = list_alloc();
list_append_number(list, id); list_append_number(list, id);
list_append_string(list, (char_u *)buffer, len); list_append(list, str_slot);
list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3); list_append_string(list, (char_u *)(from_stdout ? "out" : "err"), 3);
// Update v:job_data for the autocommands
set_vim_var_list(VV_JOB_DATA, list); set_vim_var_list(VV_JOB_DATA, list);
// Call JobActivity autocommands
apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL); apply_autocmds(EVENT_JOBACTIVITY, (char_u *)data, NULL, TRUE, NULL);
} }

View File

@ -2,6 +2,7 @@
#define NEOVIM_OS_EVENT_DEFS_H #define NEOVIM_OS_EVENT_DEFS_H
#include "os/job_defs.h" #include "os/job_defs.h"
#include "os/rstream_defs.h"
typedef enum { typedef enum {
kEventSignal, kEventSignal,
@ -12,7 +13,11 @@ typedef struct {
EventType type; EventType type;
union { union {
int signum; int signum;
Job *job; struct {
Job *ptr;
RStream *target;
bool from_stdout;
} job;
} data; } data;
} Event; } Event;

View File

@ -5,6 +5,8 @@
#include "os/job.h" #include "os/job.h"
#include "os/job_defs.h" #include "os/job_defs.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
#include "os/time.h" #include "os/time.h"
#include "os/shell.h" #include "os/shell.h"
#include "vim.h" #include "vim.h"
@ -15,13 +17,6 @@
#define MAX_RUNNING_JOBS 100 #define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 1024 #define JOB_BUFFER_SIZE 1024
/// Possible lock states of the job buffer
typedef enum {
kBufferLockNone = 0, ///< No data was read
kBufferLockStdout, ///< Data read from stdout
kBufferLockStderr ///< Data read from stderr
} BufferLock;
struct job { struct job {
// Job id the index in the job table plus one. // Job id the index in the job table plus one.
int id; int id;
@ -31,14 +26,10 @@ struct job {
bool stopped; bool stopped;
// Data associated with the job // Data associated with the job
void *data; void *data;
// Buffer for reading from stdout or stderr
char buffer[JOB_BUFFER_SIZE];
// Size of the data from the last read
uint32_t length;
// Buffer lock state
BufferLock lock;
// Callback for consuming data from the buffer // Callback for consuming data from the buffer
job_read_cb read_cb; job_read_cb read_cb;
// Readable streams
RStream *out, *err;
// Structures for process spawning/management used by libuv // Structures for process spawning/management used by libuv
uv_process_t proc; uv_process_t proc;
uv_process_options_t proc_opts; uv_process_options_t proc_opts;
@ -49,6 +40,7 @@ struct job {
static Job *table[MAX_RUNNING_JOBS] = {NULL}; static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uv_prepare_t job_prepare; static uv_prepare_t job_prepare;
static void read_cb(RStream *rstream, void *data, bool eof);
// Some helpers shared in this module // Some helpers shared in this module
static bool is_alive(Job *job); static bool is_alive(Job *job);
static Job * find_job(int id); static Job * find_job(int id);
@ -56,8 +48,7 @@ static void free_job(Job *job);
// Callbacks for libuv // Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle); static void job_prepare_cb(uv_prepare_t *handle);
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf); // static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf);
static void write_cb(uv_write_t *req, int status); static void write_cb(uv_write_t *req, int status);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
@ -154,12 +145,10 @@ int job_start(char **argv, void *data, job_read_cb cb)
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
job->proc_stdout.data = job;
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
job->proc_stderr.data = job;
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
@ -170,8 +159,12 @@ int job_start(char **argv, void *data, job_read_cb cb)
} }
// Start the readable streams // Start the readable streams
uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb); job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb); job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
rstream_start(job->err);
// Give the callback a reference to the job // Give the callback a reference to the job
job->proc.data = job; job->proc.data = job;
// Save the job to the table // Save the job to the table
@ -217,19 +210,13 @@ bool job_write(int id, char *data, uint32_t len)
void job_handle(Event event) void job_handle(Event event)
{ {
Job *job = event.data.job; Job *job = event.data.job.ptr;
// Invoke the job callback // Invoke the job callback
job->read_cb(job->id, job->read_cb(job->id,
job->data, job->data,
job->buffer, event.data.job.target,
job->length, event.data.job.from_stdout);
job->lock == kBufferLockStdout);
// restart reading
job->lock = kBufferLockNone;
uv_read_start((uv_stream_t *)&job->proc_stdout, alloc_cb, read_cb);
uv_read_start((uv_stream_t *)&job->proc_stderr, alloc_cb, read_cb);
} }
static bool is_alive(Job *job) static bool is_alive(Job *job)
@ -252,6 +239,8 @@ static void free_job(Job *job)
uv_close((uv_handle_t *)&job->proc_stdin, NULL); uv_close((uv_handle_t *)&job->proc_stdin, NULL);
uv_close((uv_handle_t *)&job->proc_stderr, NULL); uv_close((uv_handle_t *)&job->proc_stderr, NULL);
uv_close((uv_handle_t *)&job->proc, NULL); uv_close((uv_handle_t *)&job->proc, NULL);
rstream_free(job->out);
rstream_free(job->err);
free(job); free(job);
} }
@ -277,50 +266,22 @@ static void job_prepare_cb(uv_prepare_t *handle)
} }
} }
/// Puts the job into a 'reading state' which 'locks' the job buffer
/// until the data is consumed
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
Job *job = (Job *)handle->data;
if (job->lock != kBufferLockNone) {
// Already reserved the buffer for reading from stdout or stderr.
buf->len = 0;
return;
}
buf->base = job->buffer;
buf->len = JOB_BUFFER_SIZE;
// Avoid `alloc_cb`, `alloc_cb` sequences on windows and also mark which
// stream we are reading from
job->lock =
(handle == (uv_handle_t *)&job->proc_stdout) ?
kBufferLockStdout :
kBufferLockStderr;
}
/// Pushes a event object to the event queue, which will be handled later by /// Pushes a event object to the event queue, which will be handled later by
/// `job_handle` /// `job_handle`
static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) static void read_cb(RStream *rstream, void *data, bool eof)
{ {
Event event; Event event;
Job *job = (Job *)stream->data; Job *job = data;
// pause reading on both streams
uv_read_stop((uv_stream_t *)&job->proc_stdout);
uv_read_stop((uv_stream_t *)&job->proc_stderr);
if (cnt <= 0) { if (eof) {
if (cnt != UV_ENOBUFS) { uv_process_kill(&job->proc, SIGTERM);
// Assume it's EOF and exit the job. Doesn't harm sending a SIGTERM
// at this point
uv_process_kill(&job->proc, SIGTERM);
}
return; return;
} }
job->length = cnt;
event.type = kEventJobActivity; event.type = kEventJobActivity;
event.data.job = job; event.data.job.ptr = job;
event.data.job.target = rstream;
event.data.job.from_stdout = rstream == job->out;
event_push(event); event_push(event);
} }

View File

@ -12,21 +12,6 @@
#include "os/event.h" #include "os/event.h"
/// Function called when the job reads data
///
/// @param id The job is
/// @param data Some data associated with the job by the caller
/// @param buffer Buffer containing the data read. It must be copied
/// immediately.
/// @param len Amount of bytes that must be read from `buffer`
/// @param from_stdout This is true if data was read from the job's stdout,
/// false if it came from stderr.
typedef void (*job_read_cb)(int id,
void *data,
char *buffer,
uint32_t len,
bool from_stdout);
/// Initializes job control resources /// Initializes job control resources
void job_init(void); void job_init(void);

View File

@ -1,6 +1,21 @@
#ifndef NEOVIM_OS_JOB_DEFS_H #ifndef NEOVIM_OS_JOB_DEFS_H
#define NEOVIM_OS_JOB_DEFS_H #define NEOVIM_OS_JOB_DEFS_H
#include "os/rstream_defs.h"
typedef struct job Job; typedef struct job Job;
/// Function called when the job reads data
///
/// @param id The job id
/// @param data Some data associated with the job by the caller
/// @param target The `RStream` instance containing data to be read
/// @param from_stdout This is true if data was read from the job's stdout,
/// false if it came from stderr.
typedef void (*job_read_cb)(int id,
void *data,
RStream *target,
bool from_stdout);
#endif // NEOVIM_OS_JOB_DEFS_H #endif // NEOVIM_OS_JOB_DEFS_H