API: Refactor: Close/free channels when their streams reach EOF

This commit is contained in:
Thiago de Arruda 2014-05-26 13:39:06 -03:00
parent 4bac5e9ce1
commit 7a00caf7c4

View File

@ -3,7 +3,6 @@
#include <uv.h> #include <uv.h>
#include <msgpack.h> #include <msgpack.h>
#include "nvim/lib/klist.h"
#include "nvim/os/channel.h" #include "nvim/os/channel.h"
#include "nvim/os/channel_defs.h" #include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h" #include "nvim/os/rstream.h"
@ -15,8 +14,10 @@
#include "nvim/os/msgpack_rpc.h" #include "nvim/os/msgpack_rpc.h"
#include "nvim/vim.h" #include "nvim/vim.h"
#include "nvim/memory.h" #include "nvim/memory.h"
#include "nvim/map.h"
typedef struct { typedef struct {
uint64_t id;
ChannelProtocol protocol; ChannelProtocol protocol;
bool is_job; bool is_job;
union { union {
@ -30,22 +31,23 @@ typedef struct {
struct { struct {
RStream *read; RStream *read;
WStream *write; WStream *write;
uv_stream_t *uv;
} streams; } streams;
} data; } data;
} Channel; } Channel;
#define _destroy_channel(x) static uint64_t next_id = 1;
static Map(uint64_t) *channels = NULL;
KLIST_INIT(Channel, Channel *, _destroy_channel)
static klist_t(Channel) *channels = NULL;
static void on_job_stdout(RStream *rstream, void *data, bool eof); static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof); static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
void channel_init() void channel_init()
{ {
channels = kl_init(Channel); channels = map_new(uint64_t)();
} }
void channel_teardown() void channel_teardown()
@ -56,24 +58,9 @@ void channel_teardown()
Channel *channel; Channel *channel;
while (kl_shift(Channel, channels, &channel) == 0) { map_foreach_value(channels, channel, {
close_channel(channel);
switch (channel->protocol) { });
case kChannelProtocolMsgpack:
msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
msgpack_unpacker_free(channel->proto.msgpack.unpacker);
break;
default:
abort();
}
if (channel->is_job) {
job_stop(channel->data.job_id);
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
}
}
} }
void channel_from_job(char **argv, ChannelProtocol prot) void channel_from_job(char **argv, ChannelProtocol prot)
@ -92,10 +79,11 @@ void channel_from_job(char **argv, ChannelProtocol prot)
abort(); abort();
} }
channel->id = next_id++;
channel->protocol = prot; channel->protocol = prot;
channel->is_job = true; channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
*kl_pushp(Channel, channels) = channel; map_put(uint64_t)(channels, channel->id, channel);
} }
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
@ -115,6 +103,7 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
} }
stream->data = NULL; stream->data = NULL;
channel->id = next_id++;
channel->protocol = prot; channel->protocol = prot;
channel->is_job = false; channel->is_job = false;
// read stream // read stream
@ -124,8 +113,8 @@ void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
// write stream // write stream
channel->data.streams.write = wstream_new(1024 * 1024); channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream); wstream_set_stream(channel->data.streams.write, stream);
// push to channel list channel->data.streams.uv = stream;
*kl_pushp(Channel, channels) = channel; map_put(uint64_t)(channels, channel->id, channel);
} }
static void on_job_stdout(RStream *rstream, void *data, bool eof) static void on_job_stdout(RStream *rstream, void *data, bool eof)
@ -141,8 +130,13 @@ static void on_job_stderr(RStream *rstream, void *data, bool eof)
static void parse_msgpack(RStream *rstream, void *data, bool eof) static void parse_msgpack(RStream *rstream, void *data, bool eof)
{ {
msgpack_unpacked unpacked;
Channel *channel = data; Channel *channel = data;
if (eof) {
close_channel(channel);
return;
}
uint32_t count = rstream_available(rstream); uint32_t count = rstream_available(rstream);
// Feed the unpacker with data // Feed the unpacker with data
@ -152,6 +146,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
count); count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count); msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked); msgpack_unpacked_init(&unpacked);
// Deserialize everything we can. // Deserialize everything we can.
@ -173,3 +168,34 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
} }
} }
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
switch (channel->protocol) {
case kChannelProtocolMsgpack:
msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
msgpack_unpacker_free(channel->proto.msgpack.unpacker);
break;
default:
abort();
}
if (channel->is_job) {
job_stop(channel->data.job_id);
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
free(channel);
}
static void close_cb(uv_handle_t *handle)
{
free(handle->data);
free(handle);
}