Stream "connection" read/write methods.

This commit is contained in:
Vladimir Homutov 2020-03-13 14:39:23 +03:00
parent 5bc8cd4044
commit 05d1464c68
2 changed files with 256 additions and 9 deletions

View File

@ -60,7 +60,7 @@
(*(uint32_t *) (p) = htonl((uint32_t) (s)), (p) + sizeof(uint32_t))
#define ngx_quic_varint_len(value) \
((value) <= 63 ? 1 : (value) <= 16383 ? 2 : (value) <= 1073741823 ? 4 : 8)
((value) <= 63 ? 1 : ((uint32_t)value) <= 16383 ? 2 : ((uint64_t)value) <= 1073741823 ? 4 : 8)
#if (NGX_DEBUG)
@ -105,7 +105,7 @@ do { \
#define NGX_QUIC_FT_STOP_SENDING 0x05
#define NGX_QUIC_FT_CRYPTO 0x06
#define NGX_QUIC_FT_NEW_TOKEN 0x07
#define NGX_QUIC_FT_STREAM 0x08
#define NGX_QUIC_FT_STREAM0 0x08
#define NGX_QUIC_FT_STREAM1 0x09
#define NGX_QUIC_FT_STREAM2 0x0A
#define NGX_QUIC_FT_STREAM3 0x0B
@ -130,6 +130,12 @@ do { \
#define NGX_QUIC_FT_HANDSHAKE_DONE 0x1e
#define ngx_quic_stream_bit_off(val) (((val) & 0x04) ? 1 : 0)
#define ngx_quic_stream_bit_len(val) (((val) & 0x02) ? 1 : 0)
#define ngx_quic_stream_bit_fin(val) (((val) & 0x01) ? 1 : 0)
/* TODO: real states, these are stubs */
typedef enum {
NGX_QUIC_ST_INITIAL,
@ -184,6 +190,7 @@ typedef struct {
typedef struct {
uint8_t type;
uint64_t stream_id;
uint64_t offset;
uint64_t length;
@ -350,6 +357,13 @@ static ngx_int_t ngx_quic_tls_hp(ngx_connection_t *c, const EVP_CIPHER *cipher,
static ngx_int_t ngx_quic_ciphers(ngx_connection_t *c,
ngx_quic_ciphers_t *ciphers, enum ssl_encryption_level_t level);
static ssize_t ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf,
size_t size);
static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
size_t size);
static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit);
static SSL_QUIC_METHOD quic_method = {
#if BORINGSSL_API_VERSION >= 10
ngx_quic_set_read_secret,
@ -640,6 +654,57 @@ ngx_quic_create_crypto(u_char *p, ngx_quic_crypto_frame_t *crypto)
return p - start;
}
static size_t
ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf)
{
size_t len;
u_char *start;
if (!ngx_quic_stream_bit_len(sf->type)) {
#if 0
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"attempt to generate a stream frame without length");
#endif
// XXX: handle error in caller
return NGX_ERROR;
}
if (p == NULL) {
len = ngx_quic_varint_len(sf->type);
if (ngx_quic_stream_bit_off(sf->type)) {
len += ngx_quic_varint_len(sf->offset);
}
len += ngx_quic_varint_len(sf->stream_id);
/* length is always present in generated frames */
len += ngx_quic_varint_len(sf->length);
len += sf->length;
return len;
}
start = p;
ngx_quic_build_int(&p, sf->type);
ngx_quic_build_int(&p, sf->stream_id);
if (ngx_quic_stream_bit_off(sf->type)) {
ngx_quic_build_int(&p, sf->offset);
}
/* length is always present in generated frames */
ngx_quic_build_int(&p, sf->length);
p = ngx_cpymem(p, sf->data, sf->length);
return p - start;
}
size_t
ngx_quic_frame_len(ngx_quic_frame_t *frame)
{
@ -648,6 +713,16 @@ ngx_quic_frame_len(ngx_quic_frame_t *frame)
return ngx_quic_create_ack(NULL, &frame->u.ack);
case NGX_QUIC_FT_CRYPTO:
return ngx_quic_create_crypto(NULL, &frame->u.crypto);
case NGX_QUIC_FT_STREAM0:
case NGX_QUIC_FT_STREAM1:
case NGX_QUIC_FT_STREAM2:
case NGX_QUIC_FT_STREAM3:
case NGX_QUIC_FT_STREAM4:
case NGX_QUIC_FT_STREAM5:
case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7:
return ngx_quic_create_stream(NULL, &frame->u.stream);
default:
/* BUG: unsupported frame type generated */
return 0;
@ -687,6 +762,17 @@ ngx_quic_frames_send(ngx_connection_t *c, ngx_quic_frame_t *start,
p += ngx_quic_create_crypto(p, &f->u.crypto);
break;
case NGX_QUIC_FT_STREAM0:
case NGX_QUIC_FT_STREAM1:
case NGX_QUIC_FT_STREAM2:
case NGX_QUIC_FT_STREAM3:
case NGX_QUIC_FT_STREAM4:
case NGX_QUIC_FT_STREAM5:
case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7:
p += ngx_quic_create_stream(p, &f->u.stream);
break;
default:
/* BUG: unsupported frame type generated */
return NGX_ERROR;
@ -1653,7 +1739,7 @@ ngx_quic_read_frame(ngx_connection_t *c, u_char *start, u_char *end,
return NGX_ERROR;
break;
case NGX_QUIC_FT_STREAM:
case NGX_QUIC_FT_STREAM0:
case NGX_QUIC_FT_STREAM1:
case NGX_QUIC_FT_STREAM2:
case NGX_QUIC_FT_STREAM3:
@ -1665,6 +1751,8 @@ ngx_quic_read_frame(ngx_connection_t *c, u_char *start, u_char *end,
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"STREAM frame, type: 0x%xi", frame->type);
frame->u.stream.type = frame->type;
frame->u.stream.stream_id = ngx_quic_parse_int(&p);
if (frame->type & 0x04) {
frame->u.stream.offset = ngx_quic_parse_int(&p);
@ -1797,6 +1885,109 @@ ngx_quic_init_connection(ngx_connection_t *c, ngx_quic_header_t *pkt)
}
static ssize_t
ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
ssize_t len;
ngx_buf_t *b;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
qs = c->qs;
qc = qs->parent->quic;
// XXX: get direct pointer from stream structure?
sn = ngx_quic_stream_lookup(&qc->stree, qs->id);
if (sn == NULL) {
return NGX_ERROR;
}
// XXX: how to return EOF?
b = sn->b;
if (b->last - b->pos == 0) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic recv() not ready");
return NGX_AGAIN; // ?
}
len = ngx_min(b->last - b->pos, (ssize_t) size);
ngx_memcpy(buf, b->pos, len);
b->pos += len;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic recv: %z of %uz", len, size);
return len;
}
static ssize_t
ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
{
u_char *p;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic send: %uz", size);
qs = c->qs;
qc = qs->parent->quic;
// XXX: get direct pointer from stream structure?
sn = ngx_quic_stream_lookup(&qc->stree, qs->id);
if (sn == NULL) {
return NGX_ERROR;
}
frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
if (frame == NULL) {
return 0;
}
p = ngx_pnalloc(c->pool, size);
if (p == NULL) {
return 0;
}
ngx_memcpy(p, buf, size);
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_STREAM2; /* OFF=0 LEN=1 FIN=0 */
frame->u.stream.type = frame->type;
frame->u.stream.stream_id = qs->id;
frame->u.stream.offset = 0;
frame->u.stream.length = size;
frame->u.stream.data = p;
ngx_sprintf(frame->info, "stream %xi len=%ui level=%d",
qs->id, size, frame->level);
ngx_quic_queue_frame(qc, frame);
return size;
}
static ngx_chain_t *
ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in,
off_t limit)
{
// TODO
return NULL;
}
/* process all payload from the current packet and generate ack if required */
static ngx_int_t
ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
@ -1805,6 +1996,8 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ssize_t len;
ngx_buf_t *b;
ngx_uint_t ack_this;
ngx_pool_t *pool;
ngx_event_t *rev, *wev;
ngx_quic_frame_t frame, *ack_frame;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
@ -1873,7 +2066,7 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
frame.u.ncid.len);
continue;
case NGX_QUIC_FT_STREAM:
case NGX_QUIC_FT_STREAM0:
case NGX_QUIC_FT_STREAM1:
case NGX_QUIC_FT_STREAM2:
case NGX_QUIC_FT_STREAM3:
@ -1882,12 +2075,15 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7:
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
"STREAM frame 0x%xi id 0x%xi off 0x%xi len 0x%xi",
ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
"STREAM frame 0x%xi id 0x%xi offset 0x%xi len 0x%xi bits:off=%d len=%d fin=%d",
frame.type,
frame.u.stream.stream_id,
frame.u.stream.offset,
frame.u.stream.length);
frame.u.stream.length,
ngx_quic_stream_bit_off(frame.u.stream.type),
ngx_quic_stream_bit_len(frame.u.stream.type),
ngx_quic_stream_bit_fin(frame.u.stream.type));
sn = ngx_quic_stream_lookup(&qc->stree, frame.u.stream.stream_id);
@ -1899,13 +2095,28 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
return NGX_ERROR;
}
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
return NGX_ERROR;
}
sn->c = ngx_get_connection(-1, c->log); // TODO: free on connection termination
if (sn->c == NULL) {
return NGX_ERROR;
}
sn->c->pool = pool;
rev = sn->c->read;
wev = sn->c->write;
rev->log = c->log;
wev->log = c->log;
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
sn->node.key = frame.u.stream.stream_id;
sn->b = ngx_create_temp_buf(c->pool, 16 * 1024); // XXX enough for everyone
sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
if (sn->b == NULL) {
return NGX_ERROR;
}
@ -1917,9 +2128,14 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ngx_rbtree_insert(&qc->stree, &sn->node);
sn->s.id = frame.u.stream.stream_id;
sn->s.unidirectional = (sn->s.id & 0x02) ? 1 : 0;
sn->s.parent = c;
sn->c->qs = &sn->s;
sn->c->recv = ngx_quic_stream_recv;
sn->c->send = ngx_quic_stream_send;
sn->c->send_chain = ngx_quic_stream_send_chain;
qc->stream_handler(sn->c);
} else {
@ -1973,7 +2189,7 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ack_frame->type = NGX_QUIC_FT_ACK;
ack_frame->u.ack.pn = pkt->pn;
ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler", pkt->pn);
ngx_sprintf(ack_frame->info, "ACK for PN=%d from frame handler level=%d", pkt->pn, pkt->level);
ngx_quic_queue_frame(qc, ack_frame);
return ngx_quic_output(c);

View File

@ -395,8 +395,39 @@ ngx_http_quic_stream_handler(ngx_connection_t *c)
{
ngx_quic_stream_t *qs = c->qs;
// STUB for stream read/write
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"quic stream: 0x%uXL", qs->id);
ssize_t n;
ngx_buf_t b;
u_char buf[512];
b.start = buf;
b.end = buf + 512;
b.pos = b.last = b.start;
n = c->recv(c, b.pos, b.end - b.start);
if (n < 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream read failed");
return;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"quic stream: 0x%uXL %ui bytes read", qs->id, n);
b.last += n;
n = c->send(c, b.start, n);
if (n < 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "stream write failed");
return;
}
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"quic stream: 0x%uXL %ui bytes written", qs->id, n);
}