QUIC: simplified stream initialization.

After creation, a client stream is added to qc->streams.uninitialized queue.
After initialization it's removed from the queue.  If a stream is never
initialized, it is freed in ngx_quic_close_streams().  Stream initializer
is now set as read event handler in stream connection.

Previously qc->streams.uninitialized was used only for delayed stream
initialization.

The change makes it possible not to handle separately the case of a new stream
in stream-related frame handlers.  It makes these handlers simpler since new
streams and existing streams are now handled by the same code.
This commit is contained in:
Roman Arutyunyan 2021-12-10 19:43:50 +03:00
parent 59312ddac1
commit 6ea39f03ae

View File

@ -13,10 +13,9 @@
#define NGX_QUIC_STREAM_GONE (void *) -1
static ngx_quic_stream_t *ngx_quic_create_client_stream(ngx_connection_t *c,
uint64_t id);
static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
static ngx_int_t ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id);
static ngx_int_t ngx_quic_init_stream(ngx_quic_stream_t *qs);
static void ngx_quic_init_stream_handler(ngx_event_t *ev);
static void ngx_quic_init_streams_handler(ngx_connection_t *c);
static ngx_quic_stream_t *ngx_quic_create_stream(ngx_connection_t *c,
uint64_t id);
@ -306,21 +305,28 @@ ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
static ngx_quic_stream_t *
ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
ngx_quic_get_stream(ngx_connection_t *c, uint64_t id)
{
uint64_t min_id;
ngx_event_t *rev;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL is new", id);
qc = ngx_quic_get_connection(c);
qs = ngx_quic_find_stream(&qc->streams.tree, id);
if (qs) {
return qs;
}
if (qc->shutdown || qc->closing) {
return NGX_QUIC_STREAM_GONE;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL is missing", id);
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
@ -377,7 +383,11 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
* streams of that type with lower-numbered stream IDs also being opened.
*/
for ( /* void */ ; min_id < id; min_id += 0x04) {
#if (NGX_SUPPRESS_WARN)
qs = NULL;
#endif
for ( /* void */ ; min_id <= id; min_id += 0x04) {
qs = ngx_quic_create_stream(c, min_id);
@ -389,22 +399,17 @@ ngx_quic_create_client_stream(ngx_connection_t *c, uint64_t id)
continue;
}
if (ngx_quic_init_stream(qs) != NGX_OK) {
return NULL;
}
ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
if (qc->shutdown || qc->closing) {
return NGX_QUIC_STREAM_GONE;
rev = qs->connection->read;
rev->handler = ngx_quic_init_stream_handler;
if (qc->streams.initialized) {
ngx_post_event(rev, &ngx_posted_events);
}
}
qs = ngx_quic_create_stream(c, id);
if (qs == NULL) {
if (ngx_quic_reject_stream(c, id) != NGX_OK) {
return NULL;
}
return NGX_QUIC_STREAM_GONE;
}
@ -461,29 +466,20 @@ ngx_quic_reject_stream(ngx_connection_t *c, uint64_t id)
}
static ngx_int_t
ngx_quic_init_stream(ngx_quic_stream_t *qs)
static void
ngx_quic_init_stream_handler(ngx_event_t *ev)
{
ngx_connection_t *c;
ngx_quic_connection_t *qc;
ngx_connection_t *c;
ngx_quic_stream_t *qs;
qc = ngx_quic_get_connection(qs->parent);
c = qs->connection;
if (!qc->streams.initialized) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic postpone stream init");
ngx_queue_insert_tail(&qc->streams.uninitialized, &qs->queue);
return NGX_OK;
}
c = ev->data;
qs = c->quic;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic init stream");
c->listening->handler(c);
ngx_queue_remove(&qs->queue);
return NGX_OK;
c->listening->handler(c);
}
@ -527,16 +523,12 @@ ngx_quic_init_streams_handler(ngx_connection_t *c)
qc = ngx_quic_get_connection(c);
while (!ngx_queue_empty(&qc->streams.uninitialized)) {
q = ngx_queue_head(&qc->streams.uninitialized);
ngx_queue_remove(q);
for (q = ngx_queue_head(&qc->streams.uninitialized);
q != ngx_queue_sentinel(&qc->streams.uninitialized);
q = ngx_queue_next(q))
{
qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
"quic init postponed stream");
qs->connection->listening->handler(qs->connection);
ngx_post_event(qs->connection->read, &ngx_posted_events);
}
qc->streams.initialized = 1;
@ -1013,7 +1005,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
ngx_quic_frame_t *frame)
{
uint64_t last;
ngx_pool_t *pool;
ngx_event_t *rev;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
@ -1033,39 +1024,14 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
/* no overflow since both values are 62-bit */
last = f->offset + f->length;
qs = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
qs = ngx_quic_get_stream(c, f->stream_id);
if (qs == NULL) {
qs = ngx_quic_create_client_stream(c, f->stream_id);
return NGX_ERROR;
}
if (qs == NULL) {
return NGX_ERROR;
}
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
sc = qs->connection;
if (ngx_quic_control_flow(sc, last) != NGX_OK) {
goto cleanup;
}
if (f->fin) {
sc->read->pending_eof = 1;
qs->final_size = last;
}
if (f->offset == 0) {
sc->read->ready = 1;
}
if (ngx_quic_order_bufs(c, &qs->in, frame->data, f->offset) != NGX_OK) {
goto cleanup;
}
return ngx_quic_init_stream(qs);
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
sc = qs->connection;
@ -1125,15 +1091,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
}
return NGX_OK;
cleanup:
pool = sc->pool;
ngx_close_connection(sc);
ngx_destroy_pool(pool);
return NGX_ERROR;
}
@ -1210,20 +1167,14 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
return NGX_ERROR;
}
qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
qs = ngx_quic_get_stream(c, f->id);
if (qs == NULL) {
qs = ngx_quic_create_client_stream(c, f->id);
return NGX_ERROR;
}
if (qs == NULL) {
return NGX_ERROR;
}
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
return ngx_quic_init_stream(qs);
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
return ngx_quic_update_max_stream_data(qs->connection);
@ -1248,24 +1199,14 @@ ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
return NGX_ERROR;
}
qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
qs = ngx_quic_get_stream(c, f->id);
if (qs == NULL) {
qs = ngx_quic_create_client_stream(c, f->id);
return NGX_ERROR;
}
if (qs == NULL) {
return NGX_ERROR;
}
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
if (f->limit > qs->send_max_data) {
qs->send_max_data = f->limit;
}
return ngx_quic_init_stream(qs);
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
if (f->limit <= qs->send_max_data) {
@ -1293,7 +1234,6 @@ ngx_int_t
ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
{
ngx_pool_t *pool;
ngx_event_t *rev;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
@ -1308,36 +1248,14 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_ERROR;
}
qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
qs = ngx_quic_get_stream(c, f->id);
if (qs == NULL) {
qs = ngx_quic_create_client_stream(c, f->id);
return NGX_ERROR;
}
if (qs == NULL) {
return NGX_ERROR;
}
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
sc = qs->connection;
rev = sc->read;
rev->error = 1;
rev->ready = 1;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
goto cleanup;
}
qs->final_size = f->final_size;
if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
goto cleanup;
}
return ngx_quic_init_stream(qs);
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
sc = qs->connection;
@ -1371,15 +1289,6 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
}
return NGX_OK;
cleanup:
pool = sc->pool;
ngx_close_connection(sc);
ngx_destroy_pool(pool);
return NGX_ERROR;
}
@ -1387,9 +1296,7 @@ ngx_int_t
ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stop_sending_frame_t *f)
{
ngx_pool_t *pool;
ngx_event_t *wev;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@ -1402,31 +1309,14 @@ ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
return NGX_ERROR;
}
qs = ngx_quic_find_stream(&qc->streams.tree, f->id);
qs = ngx_quic_get_stream(c, f->id);
if (qs == NULL) {
qs = ngx_quic_create_client_stream(c, f->id);
return NGX_ERROR;
}
if (qs == NULL) {
return NGX_ERROR;
}
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
sc = qs->connection;
if (ngx_quic_reset_stream(sc, f->error_code) != NGX_OK) {
pool = sc->pool;
ngx_close_connection(sc);
ngx_destroy_pool(pool);
return NGX_ERROR;
}
return ngx_quic_init_stream(qs);
if (qs == NGX_QUIC_STREAM_GONE) {
return NGX_OK;
}
if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {