mirror of
https://github.com/nginx/nginx.git
synced 2025-02-25 18:55:26 -06:00
QUIC: create streams for STREAM_DATA_BLOCKED and MAX_STREAM_DATA.
Creating client-initiated streams is moved from ngx_quic_handle_stream_frame() to a separate function ngx_quic_create_client_stream(). This function is responsible for creating streams with lower ids as well. Also, simplified and fixed initial data buffering in ngx_quic_handle_stream_frame(). It is now done before calling the initial handler as the handler can destroy the stream.
This commit is contained in:
@@ -34,6 +34,8 @@
|
||||
*/
|
||||
#define NGX_QUIC_MAX_BUFFERED 65535
|
||||
|
||||
#define NGX_QUIC_STREAM_GONE (void *) -1
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_rbtree_t tree;
|
||||
@@ -270,6 +272,8 @@ static void ngx_quic_rbtree_insert_stream(ngx_rbtree_node_t *temp,
|
||||
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
|
||||
static ngx_quic_stream_t *ngx_quic_find_stream(ngx_rbtree_t *rbtree,
|
||||
uint64_t id);
|
||||
static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
|
||||
uint64_t id);
|
||||
static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
|
||||
uint64_t id, size_t rcvbuf_size);
|
||||
static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
|
||||
@@ -2893,10 +2897,8 @@ static ngx_int_t
|
||||
ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
ngx_quic_frame_t *frame)
|
||||
{
|
||||
size_t n;
|
||||
uint64_t id;
|
||||
ngx_buf_t *b;
|
||||
ngx_event_t *rev;
|
||||
ngx_pool_t *pool;
|
||||
ngx_connection_t *sc;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_quic_connection_t *qc;
|
||||
ngx_quic_stream_frame_t *f;
|
||||
@@ -2915,92 +2917,34 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
|
||||
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
|
||||
|
||||
if (sn == NULL) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream id 0x%xL is new", f->stream_id);
|
||||
sn = ngx_quic_create_client_stream(c, f->stream_id);
|
||||
|
||||
if (f->stream_id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
|
||||
if (sn == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
|
||||
if ((f->stream_id >> 2) < qc->streams.client_streams_uni) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if ((f->stream_id >> 2) >= qc->streams.client_max_streams_uni) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
id = (qc->streams.client_streams_uni << 2)
|
||||
| NGX_QUIC_STREAM_UNIDIRECTIONAL;
|
||||
qc->streams.client_streams_uni = (f->stream_id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_uni;
|
||||
|
||||
} else {
|
||||
if ((f->stream_id >> 2) < qc->streams.client_streams_bidi) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if ((f->stream_id >> 2) >= qc->streams.client_max_streams_bidi) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
id = (qc->streams.client_streams_bidi << 2);
|
||||
qc->streams.client_streams_bidi = (f->stream_id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_bidi_remote;
|
||||
}
|
||||
|
||||
if (n < NGX_QUIC_STREAM_BUFSIZE) {
|
||||
n = NGX_QUIC_STREAM_BUFSIZE;
|
||||
}
|
||||
|
||||
if (n < f->length) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"quic no space in stream buffer");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
* 2.1. Stream Types and Identifiers
|
||||
*
|
||||
* Within each type, streams are created with numerically increasing
|
||||
* stream IDs. A stream ID that is used out of order results in all
|
||||
* streams of that type with lower-numbered stream IDs also being
|
||||
* opened.
|
||||
*/
|
||||
|
||||
for ( /* void */ ; id <= f->stream_id; id += 0x04) {
|
||||
|
||||
sn = ngx_quic_create_stream(c, id, n);
|
||||
if (sn == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (id == f->stream_id && f->offset == 0) {
|
||||
b = sn->b;
|
||||
b->last = ngx_cpymem(b->last, f->data, f->length);
|
||||
|
||||
sn->fs.received += f->length;
|
||||
|
||||
rev = sn->c->read;
|
||||
rev->ready = 1;
|
||||
|
||||
if (f->fin) {
|
||||
rev->pending_eof = 1;
|
||||
}
|
||||
}
|
||||
|
||||
sn->c->listening->handler(sn->c);
|
||||
}
|
||||
|
||||
if (f->offset == 0) {
|
||||
if (sn == NGX_QUIC_STREAM_GONE) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* out-of-order stream: proceed to buffering */
|
||||
sc = sn->c;
|
||||
fs = &sn->fs;
|
||||
|
||||
if (ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input,
|
||||
sn)
|
||||
!= NGX_OK)
|
||||
{
|
||||
pool = sc->pool;
|
||||
|
||||
ngx_close_connection(sc);
|
||||
ngx_destroy_pool(pool);
|
||||
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
sc->listening->handler(sc);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
fs = &sn->fs;
|
||||
@@ -3026,8 +2970,6 @@ ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame, void *data)
|
||||
f = &frame->u.stream;
|
||||
id = f->stream_id;
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic existing stream");
|
||||
|
||||
b = sn->b;
|
||||
|
||||
if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
|
||||
@@ -3132,13 +3074,25 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
|
||||
sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
|
||||
|
||||
if (sn == NULL) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"quic unknown stream id:0x%xL", f->id);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
sn = ngx_quic_create_client_stream(c, f->id);
|
||||
|
||||
b = sn->b;
|
||||
n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
|
||||
if (sn == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (sn == NGX_QUIC_STREAM_GONE) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
b = sn->b;
|
||||
n = b->end - b->last;
|
||||
|
||||
sn->c->listening->handler(sn->c);
|
||||
|
||||
} else {
|
||||
b = sn->b;
|
||||
n = sn->fs.received + (b->pos - b->start) + (b->end - b->last);
|
||||
}
|
||||
|
||||
frame = ngx_quic_alloc_frame(c, 0);
|
||||
if (frame == NULL) {
|
||||
@@ -3182,14 +3136,23 @@ ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
|
||||
sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
|
||||
|
||||
if (sn == NULL) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"unknown stream id:0x%xL", f->id);
|
||||
sn = ngx_quic_create_client_stream(c, f->id);
|
||||
|
||||
if (f->id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
|
||||
if (sn == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
return NGX_ERROR;
|
||||
if (sn == NGX_QUIC_STREAM_GONE) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (f->limit > sn->send_max_data) {
|
||||
sn->send_max_data = f->limit;
|
||||
}
|
||||
|
||||
sn->c->listening->handler(sn->c);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (f->limit <= sn->send_max_data) {
|
||||
@@ -3886,6 +3849,96 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
|
||||
}
|
||||
|
||||
|
||||
static ngx_quic_stream_t *
|
||||
ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
|
||||
{
|
||||
size_t n;
|
||||
uint64_t min_id;
|
||||
ngx_quic_stream_t *sn;
|
||||
ngx_quic_connection_t *qc;
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"quic stream id 0x%xL is new", id);
|
||||
|
||||
qc = c->quic;
|
||||
|
||||
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
|
||||
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
if ((id >> 2) < qc->streams.server_streams_uni) {
|
||||
return NGX_QUIC_STREAM_GONE;
|
||||
}
|
||||
|
||||
qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((id >> 2) < qc->streams.client_streams_uni) {
|
||||
return NGX_QUIC_STREAM_GONE;
|
||||
}
|
||||
|
||||
if ((id >> 2) >= qc->streams.client_max_streams_uni) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
min_id = (qc->streams.client_streams_uni << 2)
|
||||
| NGX_QUIC_STREAM_UNIDIRECTIONAL;
|
||||
qc->streams.client_streams_uni = (id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_uni;
|
||||
|
||||
} else {
|
||||
|
||||
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
|
||||
if ((id >> 2) < qc->streams.server_streams_bidi) {
|
||||
return NGX_QUIC_STREAM_GONE;
|
||||
}
|
||||
|
||||
qc->error = NGX_QUIC_ERR_STREAM_STATE_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((id >> 2) < qc->streams.client_streams_bidi) {
|
||||
return NGX_QUIC_STREAM_GONE;
|
||||
}
|
||||
|
||||
if ((id >> 2) >= qc->streams.client_max_streams_bidi) {
|
||||
qc->error = NGX_QUIC_ERR_STREAM_LIMIT_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
min_id = (qc->streams.client_streams_bidi << 2);
|
||||
qc->streams.client_streams_bidi = (id >> 2) + 1;
|
||||
n = qc->tp.initial_max_stream_data_bidi_remote;
|
||||
}
|
||||
|
||||
if (n < NGX_QUIC_STREAM_BUFSIZE) {
|
||||
n = NGX_QUIC_STREAM_BUFSIZE;
|
||||
}
|
||||
|
||||
/*
|
||||
* 2.1. Stream Types and Identifiers
|
||||
*
|
||||
* Within each type, streams are created with numerically increasing
|
||||
* stream IDs. A stream ID that is used out of order results in all
|
||||
* streams of that type with lower-numbered stream IDs also being
|
||||
* opened.
|
||||
*/
|
||||
|
||||
for ( /* void */ ; min_id < id; min_id += 0x04) {
|
||||
|
||||
sn = ngx_quic_create_stream(c, min_id, n);
|
||||
if (sn == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sn->c->listening->handler(sn->c);
|
||||
}
|
||||
|
||||
return ngx_quic_create_stream(c, id, n);
|
||||
}
|
||||
|
||||
|
||||
static ngx_quic_stream_t *
|
||||
ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user