Prevent too early sending of delayed notifications.

Notifications for a channel will be sent directly if there are no
pending requests (for this channel). Otherwise notifications are queued
for later sending.

But in two cases a notification could be sent with pending requests:
* Broadcasting a notification
* A channel that has just finished its last pending request
  would call send_delayed_notifications() for all channels.

To prevent this, every channel can now only send its own delayed
notifications and broadcasting checks for pending requests.
This commit is contained in:
oni-link 2015-02-12 13:24:41 +01:00 committed by Thiago de Arruda
parent ab02637592
commit db3ae72d19

View File

@ -59,6 +59,7 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
} Channel;
typedef struct {
@ -68,18 +69,10 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
typedef struct {
Channel *channel;
String method;
Array args;
} DelayedNotification;
#define _noop(x)
KMEMPOOL_INIT(RequestEventPool, RequestEvent, _noop)
KLIST_INIT(DelayedNotification, DelayedNotification, _noop)
static kmempool_t(RequestEventPool) *request_event_pool = NULL;
static klist_t(DelayedNotification) *delayed_notifications = NULL;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
@ -93,7 +86,6 @@ static msgpack_sbuffer out_buffer;
void channel_init(void)
{
request_event_pool = kmp_init(RequestEventPool);
delayed_notifications = kl_init(DelayedNotification);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
@ -191,13 +183,10 @@ bool channel_send_event(uint64_t id, char *name, Array args)
if (channel) {
if (channel->pending_requests) {
DelayedNotification p = {
.channel = channel,
.method = cstr_to_string(name),
.args = args
};
// Pending request, queue the notification for sending later
*kl_pushp(DelayedNotification, delayed_notifications) = p;
// Pending request, queue the notification for later sending.
String method = cstr_as_string(name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(WBuffer *, channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
@ -248,7 +237,7 @@ Object channel_send_call(uint64_t id,
}
if (!channel->pending_requests) {
send_delayed_notifications();
send_delayed_notifications(channel);
}
decref(channel);
@ -593,7 +582,12 @@ static void broadcast_event(char *name, Array args)
kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
channel_write(kv_A(subscribed, i), buffer);
Channel *channel = kv_A(subscribed, i);
if (channel->pending_requests) {
kv_push(WBuffer *, channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
}
end:
@ -666,6 +660,7 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
free(channel);
}
@ -686,6 +681,7 @@ static Channel *register_channel(void)
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);
return rv;
}
@ -773,18 +769,14 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}
static void send_delayed_notifications(void)
static void send_delayed_notifications(Channel* channel)
{
DelayedNotification p;
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}
while (kl_shift(DelayedNotification, delayed_notifications, &p) == 0) {
if (p.channel) {
send_event(p.channel, p.method.data, p.args);
} else {
broadcast_event(p.method.data, p.args);
}
free(p.method.data);
}
kv_size(channel->delayed_notifications) = 0;
}
static void incref(Channel *channel)