Add channel module

- Add channel module that exposes the API over arbitrary streams
- Add `xmemdup` for duplicating memory chunks
- Make job exit callback optional
This commit is contained in:
Thiago de Arruda 2014-05-07 17:59:57 -03:00
parent b3268d0712
commit f9c06e47c4
8 changed files with 242 additions and 3 deletions

View File

@ -214,6 +214,10 @@ char *xstrndup(const char *str, size_t len)
return xmemdupz(str, p ? (size_t)(p - str) : len);
}
char *xmemdup(const char *data, size_t len)
{
return memcpy(xmalloc(len), data, len);
}
/*
* Avoid repeating the error message many times (they take 1 second each).

View File

@ -127,6 +127,22 @@ char *xstpcpy(char *restrict dst, const char *restrict src);
/// @param maxlen
char *xstpncpy(char *restrict dst, const char *restrict src, size_t maxlen);
/// Duplicates a chunk of memory using xmalloc
///
/// @see {xmalloc}
/// @param data pointer to the chunk
/// @param len size of the chunk
/// @return a pointer
char *xmemdup(const char *data, size_t len)
FUNC_ATTR_MALLOC FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET;
/// Old low level memory allocation function.
///
/// @deprecated use xmalloc() directly instead
/// @param size
/// @return pointer to allocated space. Never NULL
char_u *lalloc(long_u size, int message) FUNC_ATTR_MALLOC FUNC_ATTR_ALLOC_SIZE(1);
void do_outofmem_msg(long_u size);
void free_all_mem(void);

175
src/os/channel.c Normal file
View File

@ -0,0 +1,175 @@
#include <string.h>
#include <uv.h>
#include <msgpack.h>
#include "lib/klist.h"
#include "os/channel.h"
#include "os/channel_defs.h"
#include "os/rstream.h"
#include "os/rstream_defs.h"
#include "os/wstream.h"
#include "os/wstream_defs.h"
#include "os/job.h"
#include "os/job_defs.h"
#include "os/msgpack_rpc.h"
#include "vim.h"
#include "memory.h"
typedef struct {
ChannelProtocol protocol;
bool is_job;
union {
struct {
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
} msgpack;
} proto;
union {
int job_id;
struct {
RStream *read;
WStream *write;
} streams;
} data;
} Channel;
#define _destroy_channel(x)
KLIST_INIT(Channel, Channel *, _destroy_channel)
static klist_t(Channel) *channels = NULL;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
void channel_init()
{
channels = kl_init(Channel);
}
void channel_teardown()
{
if (!channels) {
return;
}
Channel *channel;
while (kl_shift(Channel, channels, &channel) == 0) {
switch (channel->protocol) {
case kChannelProtocolMsgpack:
msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
msgpack_unpacker_free(channel->proto.msgpack.unpacker);
break;
default:
abort();
}
if (channel->is_job) {
job_stop(channel->data.job_id);
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
}
}
}
void channel_from_job(char **argv, ChannelProtocol prot)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL;
switch (prot) {
case kChannelProtocolMsgpack:
rcb = on_job_stdout;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
channel->protocol = prot;
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
*kl_pushp(Channel, channels) = channel;
}
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL;
switch (prot) {
case kChannelProtocolMsgpack:
rcb = parse_msgpack;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
stream->data = NULL;
channel->protocol = prot;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream);
// push to channel list
*kl_pushp(Channel, channels) = channel;
}
static void on_job_stdout(RStream *rstream, void *data, bool eof)
{
Job *job = data;
parse_msgpack(rstream, job_data(job), eof);
}
static void on_job_stderr(RStream *rstream, void *data, bool eof)
{
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
{
msgpack_unpacked unpacked;
Channel *channel = data;
uint32_t count = rstream_available(rstream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count);
rstream_read(rstream,
msgpack_unpacker_buffer(channel->proto.msgpack.unpacker),
count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
msgpack_unpacked_init(&unpacked);
// Deserialize everything we can.
while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
// Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response;
msgpack_packer_init(&response,
channel->proto.msgpack.sbuffer,
msgpack_sbuffer_write);
// Perform the call
msgpack_rpc_call(&unpacked.data, &response);
wstream_write(channel->data.streams.write,
xmemdup(channel->proto.msgpack.sbuffer->data,
channel->proto.msgpack.sbuffer->size),
channel->proto.msgpack.sbuffer->size,
true);
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
}
}

29
src/os/channel.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef NEOVIM_OS_CHANNEL_H
#define NEOVIM_OS_CHANNEL_H
#include <uv.h>
#include "os/channel_defs.h"
/// Initializes the module
void channel_init(void);
/// Teardown the module
void channel_teardown(void);
/// Creates an API channel from a libuv stream representing a tcp or
/// pipe/socket client connection
///
/// @param stream The established connection
/// @param prot The rpc protocol used
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot);
/// Creates an API channel by starting a job and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
/// @param prot The rpc protocol used
void channel_from_job(char **argv, ChannelProtocol prot);
#endif // NEOVIM_OS_CHANNEL_H

8
src/os/channel_defs.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef NEOVIM_OS_CHANNEL_DEFS_H
#define NEOVIM_OS_CHANNEL_DEFS_H
typedef enum {
kChannelProtocolMsgpack
} ChannelProtocol;
#endif // NEOVIM_OS_CHANNEL_DEFS_H

View File

@ -7,6 +7,7 @@
#include "lib/klist.h"
#include "os/event.h"
#include "os/input.h"
#include "os/channel.h"
#include "os/signal.h"
#include "os/rstream.h"
#include "os/job.h"
@ -36,6 +37,8 @@ void event_init()
signal_init();
// Jobs
job_init();
// Channels
channel_init();
uv_timer_init(uv_default_loop(), &timer);
// This prepare handle that actually starts the timer
uv_prepare_init(uv_default_loop(), &timer_prepare);
@ -43,6 +46,7 @@ void event_init()
void event_teardown()
{
channel_teardown();
job_teardown();
}

View File

@ -250,8 +250,10 @@ void job_exit_event(Event event)
// this one
table[job->id - 1] = NULL;
if (job->exit_cb) {
// Invoke the exit callback
job->exit_cb(job, job->data);
}
// Free the job resources
free_job(job);

View File

@ -28,7 +28,8 @@ void job_teardown(void);
/// on stdout
/// @param stderr_cb Callback that will be invoked when data is available
/// on stderr
/// @param exit_cb Callback that will be invoked when the job exits
/// @param exit_cb Callback that will be invoked when the job exits. This is
/// optional.
/// @return The job id if the job started successfully. If the the first item /
/// of `argv`(the program) could not be executed, -1 will be returned.
// 0 will be returned if the job table is full.