QUIC: stream flow control refactored.

- Function ngx_quic_control_flow() is introduced.  This functions does
both MAX_DATA and MAX_STREAM_DATA flow controls.  The function is called
from STREAM and RESET_STREAM frame handlers.  Previously, flow control
was only accounted for STREAM.  Also, MAX_DATA flow control was not accounted
at all.

- Function ngx_quic_update_flow() is introduced.  This function advances flow
control windows and sends MAX_DATA/MAX_STREAM_DATA.  The function is called
from RESET_STREAM frame handler, stream cleanup handler and stream recv()
handler.
This commit is contained in:
Roman Arutyunyan 2021-06-07 10:12:46 +03:00
parent dcdf62549f
commit 64586eaa36
4 changed files with 179 additions and 64 deletions

View File

@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_quic_conf_t *conf,
ctp->active_connection_id_limit = 2; ctp->active_connection_id_limit = 2;
qc->streams.recv_max_data = qc->tp.initial_max_data; qc->streams.recv_max_data = qc->tp.initial_max_data;
qc->streams.recv_window = qc->streams.recv_max_data;
qc->streams.client_max_streams_uni = qc->tp.initial_max_streams_uni; qc->streams.client_max_streams_uni = qc->tp.initial_max_streams_uni;
qc->streams.client_max_streams_bidi = qc->tp.initial_max_streams_bidi; qc->streams.client_max_streams_bidi = qc->tp.initial_max_streams_bidi;

View File

@ -75,6 +75,7 @@ struct ngx_quic_stream_s {
uint64_t send_max_data; uint64_t send_max_data;
uint64_t recv_max_data; uint64_t recv_max_data;
uint64_t recv_offset; uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last; uint64_t recv_last;
uint64_t final_size; uint64_t final_size;
ngx_chain_t *in; ngx_chain_t *in;

View File

@ -115,8 +115,10 @@ typedef struct {
ngx_rbtree_t tree; ngx_rbtree_t tree;
ngx_rbtree_node_t sentinel; ngx_rbtree_node_t sentinel;
uint64_t received;
uint64_t sent; uint64_t sent;
uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last;
uint64_t recv_max_data; uint64_t recv_max_data;
uint64_t send_max_data; uint64_t send_max_data;

View File

@ -25,6 +25,8 @@ static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit); ngx_chain_t *in, off_t limit);
static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
static void ngx_quic_stream_cleanup_handler(void *data); static void ngx_quic_stream_cleanup_handler(void *data);
static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
ngx_connection_t * ngx_connection_t *
@ -413,6 +415,8 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
} }
} }
qs->recv_window = qs->recv_max_data;
cln = ngx_pool_cleanup_add(pool, 0); cln = ngx_pool_cleanup_add(pool, 0);
if (cln == NULL) { if (cln == NULL) {
ngx_close_connection(sc); ngx_close_connection(sc);
@ -432,18 +436,15 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
static ssize_t static ssize_t
ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
{ {
ssize_t len, n; ssize_t len, n;
ngx_buf_t *b; ngx_buf_t *b;
ngx_chain_t *cl, **ll; ngx_chain_t *cl, **ll;
ngx_event_t *rev; ngx_event_t *rev;
ngx_connection_t *pc; ngx_connection_t *pc;
ngx_quic_frame_t *frame; ngx_quic_stream_t *qs;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
qs = c->quic; qs = c->quic;
pc = qs->parent; pc = qs->parent;
qc = ngx_quic_get_connection(pc);
rev = c->read; rev = c->read;
if (rev->error) { if (rev->error) {
@ -495,10 +496,6 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_free_bufs(pc, cl); ngx_quic_free_bufs(pc, cl);
qc->streams.received += len;
qs->recv_max_data += len;
qs->recv_offset += len;
if (qs->in == NULL) { if (qs->in == NULL) {
rev->ready = rev->pending_eof; rev->ready = rev->pending_eof;
} }
@ -506,39 +503,8 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv len:%z", qs->id, len); "quic stream id:0x%xL recv len:%z", qs->id, len);
if (!rev->pending_eof) { if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
frame = ngx_quic_alloc_frame(pc); return NGX_ERROR;
if (frame == NULL) {
return NGX_ERROR;
}
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
frame->u.max_stream_data.id = qs->id;
frame->u.max_stream_data.limit = qs->recv_max_data;
ngx_quic_queue_frame(qc, frame);
}
if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
}
qc->streams.recv_max_data *= 2;
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_MAX_DATA;
frame->u.max_data.max_data = qc->streams.recv_max_data;
ngx_quic_queue_frame(qc, frame);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv: increased max_data:%uL",
qs->id, qc->streams.recv_max_data);
} }
return len; return len;
@ -729,6 +695,10 @@ ngx_quic_stream_cleanup_handler(void *data)
goto done; goto done;
} }
c->read->pending_eof = 1;
(void) ngx_quic_update_flow(c, qs->recv_last);
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0 if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
|| (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) || (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
{ {
@ -848,8 +818,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
sc = qs->connection; sc = qs->connection;
if (last > qs->recv_max_data) { if (ngx_quic_control_flow(sc, last) != NGX_OK) {
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
goto cleanup; goto cleanup;
} }
@ -858,8 +827,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
qs->final_size = last; qs->final_size = last;
} }
qs->recv_last = last;
if (f->offset == 0) { if (f->offset == 0) {
sc->read->ready = 1; sc->read->ready = 1;
} }
@ -873,8 +840,15 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_OK; return NGX_OK;
} }
if (last > qs->recv_max_data) { sc = qs->connection;
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
rev = sc->read;
if (rev->error) {
return NGX_OK;
}
if (ngx_quic_control_flow(sc, last) != NGX_OK) {
return NGX_ERROR; return NGX_ERROR;
} }
@ -887,17 +861,11 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_OK; return NGX_OK;
} }
if (qs->recv_last < last) {
qs->recv_last = last;
}
if (f->offset < qs->recv_offset) { if (f->offset < qs->recv_offset) {
ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset); ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
f->offset = qs->recv_offset; f->offset = qs->recv_offset;
} }
rev = qs->connection->read;
if (f->fin) { if (f->fin) {
if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
@ -1108,6 +1076,7 @@ ngx_int_t
ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
{ {
ngx_pool_t *pool;
ngx_event_t *rev; ngx_event_t *rev;
ngx_connection_t *sc; ngx_connection_t *sc;
ngx_quic_stream_t *qs; ngx_quic_stream_t *qs;
@ -1135,19 +1104,37 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_OK; return NGX_OK;
} }
qs->final_size = f->final_size;
sc = qs->connection; sc = qs->connection;
rev = sc->read; rev = sc->read;
rev->error = 1; rev->error = 1;
rev->ready = 1; rev->ready = 1;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
goto cleanup;
}
qs->final_size = f->final_size;
if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
goto cleanup;
}
sc->listening->handler(sc); sc->listening->handler(sc);
return NGX_OK; return NGX_OK;
} }
sc = qs->connection;
rev = sc->read;
rev->error = 1;
rev->ready = 1;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
return NGX_ERROR;
}
if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR; return NGX_ERROR;
@ -1160,15 +1147,24 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
qs->final_size = f->final_size; qs->final_size = f->final_size;
rev = qs->connection->read; if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
rev->error = 1; return NGX_ERROR;
rev->ready = 1; }
if (rev->active) { if (rev->active) {
rev->handler(rev); rev->handler(rev);
} }
return NGX_OK; return NGX_OK;
cleanup:
pool = sc->pool;
ngx_close_connection(sc);
ngx_destroy_pool(pool);
return NGX_ERROR;
} }
@ -1285,3 +1281,118 @@ ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
"quic stream ack len:%uL acked:%uL unacked:%uL", "quic stream ack len:%uL acked:%uL unacked:%uL",
f->u.stream.length, qs->acked, sent - qs->acked); f->u.stream.length, qs->acked, sent - qs->acked);
} }
static ngx_int_t
ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
ngx_event_t *rev;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
rev = c->read;
qs = c->quic;
qc = ngx_quic_get_connection(qs->parent);
if (last <= qs->recv_last) {
return NGX_OK;
}
len = last - qs->recv_last;
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow control msd:%uL/%uL md:%uL/%uL",
last, qs->recv_max_data, qc->streams.recv_last + len,
qc->streams.recv_max_data);
qs->recv_last += len;
if (!rev->error && qs->recv_last > qs->recv_max_data) {
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
return NGX_ERROR;
}
qc->streams.recv_last += len;
if (qc->streams.recv_last > qc->streams.recv_max_data) {
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
ngx_event_t *rev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
rev = c->read;
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
if (last <= qs->recv_offset) {
return NGX_OK;
}
len = last - qs->recv_offset;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow update %uL", last);
qs->recv_offset += len;
if (!rev->pending_eof && !rev->error
&& qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
{
qs->recv_max_data = qs->recv_offset + qs->recv_window;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow update msd:%uL", qs->recv_max_data);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
}
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
frame->u.max_stream_data.id = qs->id;
frame->u.max_stream_data.limit = qs->recv_max_data;
ngx_quic_queue_frame(qc, frame);
}
qc->streams.recv_offset += len;
if (qc->streams.recv_max_data
<= qc->streams.recv_offset + qc->streams.recv_window / 2)
{
qc->streams.recv_max_data = qc->streams.recv_offset
+ qc->streams.recv_window;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic flow update md:%uL", qc->streams.recv_max_data);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
}
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_MAX_DATA;
frame->u.max_data.max_data = qc->streams.recv_max_data;
ngx_quic_queue_frame(qc, frame);
}
return NGX_OK;
}