Refactor job.c module to use WStream

After a job has accumulated 1mb of stdin data we assume that it's stuck and kill
it.
This commit is contained in:
Thiago de Arruda 2014-04-17 16:24:43 -03:00
parent 913e92324a
commit d59034ea93

View File

@ -7,6 +7,8 @@
#include "os/job_defs.h" #include "os/job_defs.h"
#include "os/rstream.h" #include "os/rstream.h"
#include "os/rstream_defs.h" #include "os/rstream_defs.h"
#include "os/wstream.h"
#include "os/wstream_defs.h"
#include "os/event.h" #include "os/event.h"
#include "os/event_defs.h" #include "os/event_defs.h"
#include "os/time.h" #include "os/time.h"
@ -18,6 +20,7 @@
#define EXIT_TIMEOUT 25 #define EXIT_TIMEOUT 25
#define MAX_RUNNING_JOBS 100 #define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 1024 #define JOB_BUFFER_SIZE 1024
#define JOB_WRITE_MAXMEM 1024 * 1024
struct job { struct job {
// Job id the index in the job table plus one. // Job id the index in the job table plus one.
@ -38,8 +41,10 @@ struct job {
// Callbacks // Callbacks
job_exit_cb exit_cb; job_exit_cb exit_cb;
rstream_cb stdout_cb, stderr_cb; rstream_cb stdout_cb, stderr_cb;
// Readable streams // Readable streams(std{out,err})
RStream *out, *err; RStream *out, *err;
// Writable stream(stdin)
WStream *in;
// Structures for process spawning/management used by libuv // Structures for process spawning/management used by libuv
uv_process_t proc; uv_process_t proc;
uv_process_options_t proc_opts; uv_process_options_t proc_opts;
@ -57,7 +62,6 @@ static void free_job(Job *job);
// Callbacks for libuv // Callbacks for libuv
static void job_prepare_cb(uv_prepare_t *handle); static void job_prepare_cb(uv_prepare_t *handle);
static void write_cb(uv_write_t *req, int status);
static void read_cb(RStream *rstream, void *data, bool eof); static void read_cb(RStream *rstream, void *data, bool eof);
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal); static void exit_cb(uv_process_t *proc, int64_t status, int term_signal);
static void close_cb(uv_handle_t *handle); static void close_cb(uv_handle_t *handle);
@ -177,6 +181,8 @@ int job_start(char **argv,
return -1; return -1;
} }
job->in = wstream_new(JOB_WRITE_MAXMEM);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams // Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true); job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, true);
@ -209,8 +215,6 @@ bool job_stop(int id)
bool job_write(int id, char *data, uint32_t len) bool job_write(int id, char *data, uint32_t len)
{ {
uv_buf_t uvbuf;
uv_write_t *req;
Job *job = find_job(id); Job *job = find_job(id);
if (job == NULL || job->stopped) { if (job == NULL || job->stopped) {
@ -218,11 +222,10 @@ bool job_write(int id, char *data, uint32_t len)
return false; return false;
} }
req = xmalloc(sizeof(uv_write_t)); if (!wstream_write(job->in, data, len, true)) {
req->data = data; job_stop(job->id);
uvbuf.base = data; return false;
uvbuf.len = len; }
uv_write(req, (uv_stream_t *)&job->proc_stdin, &uvbuf, 1, write_cb);
return true; return true;
} }
@ -270,9 +273,6 @@ static void free_job(Job *job)
uv_close((uv_handle_t *)&job->proc_stdin, close_cb); uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
uv_close((uv_handle_t *)&job->proc_stderr, close_cb); uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
uv_close((uv_handle_t *)&job->proc, close_cb); uv_close((uv_handle_t *)&job->proc, close_cb);
rstream_free(job->out);
rstream_free(job->err);
wstream_free(job->in);
} }
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
@ -297,12 +297,6 @@ static void job_prepare_cb(uv_prepare_t *handle)
} }
} }
static void write_cb(uv_write_t *req, int status)
{
free(req->data);
free(req);
}
// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary. // Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
static void read_cb(RStream *rstream, void *data, bool eof) static void read_cb(RStream *rstream, void *data, bool eof)
{ {
@ -344,6 +338,9 @@ static void close_cb(uv_handle_t *handle)
if (--job->pending_closes == 0) { if (--job->pending_closes == 0) {
// Only free the job memory after all the associated handles are properly // Only free the job memory after all the associated handles are properly
// closed by libuv // closed by libuv
rstream_free(job->out);
rstream_free(job->err);
wstream_free(job->in);
free(job); free(job);
} }
} }