QUIC basic congestion control.

This commit is contained in:
Roman Arutyunyan 2020-04-28 16:42:43 +03:00
parent 60c8a601d0
commit 75acaec13a
2 changed files with 212 additions and 10 deletions

View File

@ -53,6 +53,14 @@ typedef struct {
} ngx_quic_streams_t;
typedef struct {
size_t in_flight;
size_t window;
size_t ssthresh;
ngx_msec_t recovery_start;
} ngx_quic_congestion_t;
/*
* 12.3. Packet Numbers
*
@ -103,6 +111,7 @@ struct ngx_quic_connection_s {
#endif
ngx_quic_streams_t streams;
ngx_quic_congestion_t congestion;
ngx_uint_t max_data;
uint64_t cur_streams;
@ -171,6 +180,8 @@ static ngx_int_t ngx_quic_handle_ack_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_ack_frame_t *f);
static ngx_int_t ngx_quic_handle_ack_frame_range(ngx_connection_t *c,
ngx_quic_send_ctx_t *ctx, uint64_t min, uint64_t max);
static void ngx_quic_handle_stream_ack(ngx_connection_t *c,
ngx_quic_frame_t *f);
static ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
ngx_quic_frames_stream_t *fs, ngx_quic_frame_t *frame,
@ -227,6 +238,10 @@ static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
static ngx_quic_frame_t *ngx_quic_alloc_frame(ngx_connection_t *c, size_t size);
static void ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame);
static void ngx_quic_congestion_ack(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static void ngx_quic_congestion_lost(ngx_connection_t *c, ngx_msec_t sent);
static SSL_QUIC_METHOD quic_method = {
#if BORINGSSL_API_VERSION >= 10
@ -586,6 +601,11 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_tp_t *tp,
qc->streams.max_data = qc->tp.initial_max_data;
qc->congestion.window = ngx_min(10 * qc->tp.max_packet_size,
ngx_max(2 * qc->tp.max_packet_size, 14720));
qc->congestion.ssthresh = NGX_MAX_SIZE_T_VALUE;
qc->congestion.recovery_start = ngx_current_msec;
qc->dcid.len = pkt->dcid.len;
qc->dcid.data = ngx_pnalloc(c->pool, pkt->dcid.len);
if (qc->dcid.data == NULL) {
@ -1613,6 +1633,9 @@ ngx_quic_handle_ack_frame_range(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
ngx_uint_t found;
ngx_queue_t *q;
ngx_quic_frame_t *f;
ngx_quic_connection_t *qc;
qc = c->quic;
found = 0;
@ -1623,6 +1646,10 @@ ngx_quic_handle_ack_frame_range(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
if (f->pnum >= min && f->pnum <= max) {
ngx_quic_congestion_ack(c, f);
ngx_quic_handle_stream_ack(c, f);
q = ngx_queue_next(q);
ngx_queue_remove(&f->queue);
ngx_quic_free_frame(c, f);
@ -1646,10 +1673,50 @@ ngx_quic_handle_ack_frame_range(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
return NGX_ERROR;
}
if (!qc->push.timer_set) {
ngx_post_event(&qc->push, &ngx_posted_events);
}
return NGX_OK;
}
static void
ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
{
uint64_t sent, unacked;
ngx_event_t *wev;
ngx_quic_stream_t *sn;
ngx_quic_connection_t *qc;
if (f->type < NGX_QUIC_FT_STREAM0 || f->type > NGX_QUIC_FT_STREAM7) {
return;
}
qc = c->quic;
sn = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
if (sn == NULL) {
return;
}
wev = sn->c->write;
sent = sn->c->sent;
unacked = sent - sn->acked;
if (unacked >= NGX_QUIC_STREAM_BUFSIZE && wev->active) {
wev->ready = 1;
ngx_post_event(wev, &ngx_posted_events);
}
sn->acked += f->u.stream.length;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, sn->c->log, 0,
"quic stream ack %uL acked:%uL, unacked:%uL",
f->u.stream.length, sn->acked, sent - sn->acked);
}
static ngx_int_t
ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
@ -2263,11 +2330,14 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
{
size_t len, hlen, n;
ngx_int_t rc;
ngx_uint_t need_ack;
ngx_queue_t *q, range;
ngx_quic_frame_t *f;
ngx_quic_congestion_t *cg;
ngx_quic_connection_t *qc;
qc = c->quic;
cg = &qc->congestion;
if (ngx_queue_empty(&ctx->frames)) {
return NGX_OK;
@ -2283,6 +2353,7 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
do {
len = 0;
need_ack = 0;
ngx_queue_init(&range);
do {
@ -2295,6 +2366,14 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
break;
}
if (f->need_ack) {
need_ack = 1;
}
if (need_ack && cg->in_flight + len + n > cg->window) {
break;
}
q = ngx_queue_next(q);
f->first = ngx_current_msec;
@ -2306,6 +2385,10 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
} while (q != ngx_queue_sentinel(&ctx->frames));
if (ngx_queue_empty(&range)) {
break;
}
rc = ngx_quic_send_frames(c, &range);
if (rc == NGX_OK) {
@ -2321,6 +2404,11 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
ngx_queue_add(&ctx->sent, &range);
}
cg->in_flight += len;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic congestion send if:%uz", cg->in_flight);
} else if (rc == NGX_DONE) {
/* no ack is expected for this frames, can free them */
@ -2386,6 +2474,8 @@ ngx_quic_send_frames(ngx_connection_t *c, ngx_queue_t *frames)
ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
now = ngx_current_msec;
p = src;
out.data = src;
@ -2409,6 +2499,7 @@ ngx_quic_send_frames(ngx_connection_t *c, ngx_queue_t *frames)
p += len;
f->pnum = ctx->pnum;
f->last = now;
}
if (start->level == ssl_encryption_initial) {
@ -2476,9 +2567,6 @@ ngx_quic_send_frames(ngx_connection_t *c, ngx_queue_t *frames)
/* len == NGX_OK || NGX_AGAIN */
ctx->pnum++;
now = ngx_current_msec;
start->last = now;
return pkt.need_ack ? NGX_OK : NGX_DONE;
}
@ -2621,6 +2709,8 @@ ngx_quic_retransmit(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx,
} while (q != ngx_queue_sentinel(&ctx->sent));
ngx_quic_congestion_lost(c, start->last);
/* NGX_DONE is impossible here, such frames don't get into this queue */
if (ngx_quic_send_frames(c, &range) != NGX_OK) {
return NGX_ERROR;
@ -2781,6 +2871,12 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
log->connection = sn->c->number;
if ((id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0
|| (id & NGX_QUIC_STREAM_SERVER_INITIATED))
{
sn->c->write->ready = 1;
}
cln = ngx_pool_cleanup_add(pool, 0);
if (cln == NULL) {
ngx_close_connection(sn->c);
@ -2899,7 +2995,8 @@ static ssize_t
ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
{
u_char *p, *end;
size_t fsize, limit;
size_t fsize, limit, n, len;
uint64_t sent, unacked;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
@ -2923,8 +3020,22 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
limit = qc->ctp.max_packet_size - NGX_QUIC_MAX_SHORT_HEADER - 25
- EVP_GCM_TLS_TAG_LEN;
len = size;
sent = c->sent;
unacked = sent - qs->acked;
if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send hit buffer size");
len = 0;
} else if (unacked + len > NGX_QUIC_STREAM_BUFSIZE) {
len = NGX_QUIC_STREAM_BUFSIZE - unacked;
}
p = (u_char *) buf;
end = (u_char *) buf + size;
end = (u_char *) buf + len;
n = 0;
while (p < end) {
@ -2951,6 +3062,7 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
c->sent += fsize;
p += fsize;
n += fsize;
ngx_sprintf(frame->info, "stream 0x%xi len=%ui level=%d",
qs->id, fsize, frame->level);
@ -2958,7 +3070,19 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_queue_frame(qc, frame);
}
return size;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream send %uz sent:%O, unacked:%uL",
n, c->sent, (uint64_t) c->sent - qs->acked);
if (n != size) {
c->write->ready = 0;
}
if (n == 0) {
return NGX_AGAIN;
}
return n;
}
@ -3121,6 +3245,83 @@ ngx_quic_alloc_frame(ngx_connection_t *c, size_t size)
}
static void
ngx_quic_congestion_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
{
ssize_t n;
ngx_msec_t timer;
ngx_quic_congestion_t *cg;
ngx_quic_connection_t *qc;
qc = c->quic;
cg = &qc->congestion;
n = ngx_quic_create_frame(NULL, f);
cg->in_flight -= n;
timer = f->last - cg->recovery_start;
if ((ngx_msec_int_t) timer <= 0) {
return;
}
if (cg->window < cg->ssthresh) {
cg->window += n;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic congestion slow start win:%uz, ss:%uz, if:%uz",
cg->window, cg->ssthresh, cg->in_flight);
} else {
cg->window += qc->tp.max_packet_size * n / cg->window;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic congestion avoidance win:%uz, ss:%uz, if:%uz",
cg->window, cg->ssthresh, cg->in_flight);
}
/* prevent recovery_start from wrapping */
timer = cg->recovery_start - ngx_current_msec + qc->tp.max_idle_timeout * 2;
if ((ngx_msec_int_t) timer < 0) {
cg->recovery_start = ngx_current_msec - qc->tp.max_idle_timeout * 2;
}
}
static void
ngx_quic_congestion_lost(ngx_connection_t *c, ngx_msec_t sent)
{
ngx_msec_t timer;
ngx_quic_congestion_t *cg;
ngx_quic_connection_t *qc;
qc = c->quic;
cg = &qc->congestion;
timer = sent - cg->recovery_start;
if ((ngx_msec_int_t) timer <= 0) {
return;
}
cg->recovery_start = ngx_current_msec;
cg->window /= 2;
if (cg->window < qc->tp.max_packet_size * 2) {
cg->window = qc->tp.max_packet_size * 2;
}
cg->ssthresh = cg->window;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic congestion lost win:%uz, ss:%uz, if:%uz",
cg->window, cg->ssthresh, cg->in_flight);
}
static void
ngx_quic_free_frame(ngx_connection_t *c, ngx_quic_frame_t *frame)
{

View File

@ -31,7 +31,7 @@
#define NGX_QUIC_STREAM_SERVER_INITIATED 0x01
#define NGX_QUIC_STREAM_UNIDIRECTIONAL 0x02
#define NGX_QUIC_STREAM_BUFSIZE 16384
#define NGX_QUIC_STREAM_BUFSIZE 65536
typedef struct {
@ -70,6 +70,7 @@ struct ngx_quic_stream_s {
ngx_connection_t *parent;
ngx_connection_t *c;
uint64_t id;
uint64_t acked;
ngx_buf_t *b;
ngx_quic_frames_stream_t fs;
};