diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c index 9aed0a879..7312c3e2e 100644 --- a/src/stream/ngx_stream.c +++ b/src/stream/ngx_stream.c @@ -302,6 +302,13 @@ ngx_stream_init_phases(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf) } #endif + if (ngx_array_init(&cmcf->phases[NGX_STREAM_PREREAD_PHASE].handlers, + cf->pool, 1, sizeof(ngx_stream_handler_pt)) + != NGX_OK) + { + return NGX_ERROR; + } + if (ngx_array_init(&cmcf->phases[NGX_STREAM_LOG_PHASE].handlers, cf->pool, 1, sizeof(ngx_stream_handler_pt)) != NGX_OK) @@ -343,6 +350,10 @@ ngx_stream_init_phase_handlers(ngx_conf_t *cf, switch (i) { + case NGX_STREAM_PREREAD_PHASE: + checker = ngx_stream_core_preread_phase; + break; + case NGX_STREAM_CONTENT_PHASE: ph->checker = ngx_stream_core_content_phase; n++; diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h index 0aded1691..deca8ae62 100644 --- a/src/stream/ngx_stream.h +++ b/src/stream/ngx_stream.h @@ -122,6 +122,7 @@ typedef enum { #if (NGX_STREAM_SSL) NGX_STREAM_SSL_PHASE, #endif + NGX_STREAM_PREREAD_PHASE, NGX_STREAM_CONTENT_PHASE, NGX_STREAM_LOG_PHASE } ngx_stream_phases; @@ -181,6 +182,8 @@ typedef struct { ngx_uint_t line; ngx_flag_t tcp_nodelay; + size_t preread_buffer_size; + ngx_msec_t preread_timeout; ngx_log_t *error_log; @@ -280,6 +283,8 @@ typedef struct { void ngx_stream_core_run_phases(ngx_stream_session_t *s); ngx_int_t ngx_stream_core_generic_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph); +ngx_int_t ngx_stream_core_preread_phase(ngx_stream_session_t *s, + ngx_stream_phase_handler_t *ph); ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph); diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c index 3a9335e77..23644f3d2 100644 --- a/src/stream/ngx_stream_core_module.c +++ b/src/stream/ngx_stream_core_module.c @@ -91,6 +91,20 @@ static ngx_command_t ngx_stream_core_commands[] = { offsetof(ngx_stream_core_srv_conf_t, tcp_nodelay), NULL }, + { ngx_string("preread_buffer_size"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_core_srv_conf_t, preread_buffer_size), + NULL }, + + { ngx_string("preread_timeout"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_core_srv_conf_t, preread_timeout), + NULL }, + ngx_null_command }; @@ -153,7 +167,7 @@ ngx_stream_core_generic_phase(ngx_stream_session_t *s, /* * generic phase checker, - * used by all phases, except for content + * used by all phases, except for preread and content */ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, @@ -185,6 +199,112 @@ ngx_stream_core_generic_phase(ngx_stream_session_t *s, } +ngx_int_t +ngx_stream_core_preread_phase(ngx_stream_session_t *s, + ngx_stream_phase_handler_t *ph) +{ + size_t size; + ssize_t n; + ngx_int_t rc; + ngx_connection_t *c; + ngx_stream_core_srv_conf_t *cscf; + + c = s->connection; + + c->log->action = "prereading client data"; + + cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); + + if (c->read->timedout) { + rc = NGX_STREAM_OK; + + } else if (c->read->timer_set) { + rc = NGX_AGAIN; + + } else { + rc = ph->handler(s); + } + + while (rc == NGX_AGAIN) { + + if (c->buffer == NULL) { + c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size); + if (c->buffer == NULL) { + rc = NGX_ERROR; + break; + } + } + + size = c->buffer->end - c->buffer->last; + + if (size == 0) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full"); + rc = NGX_STREAM_BAD_REQUEST; + break; + } + + if (c->read->eof) { + rc = NGX_STREAM_OK; + break; + } + + if (!c->read->ready) { + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + rc = NGX_ERROR; + break; + } + + if (!c->read->timer_set) { + ngx_add_timer(c->read, cscf->preread_timeout); + } + + c->read->handler = ngx_stream_session_handler; + + return NGX_OK; + } + + n = c->recv(c, c->buffer->last, size); + + if (n == NGX_ERROR) { + rc = NGX_STREAM_OK; + break; + } + + if (n > 0) { + c->buffer->last += n; + } + + rc = ph->handler(s); + } + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + if (rc == NGX_OK) { + s->phase_handler = ph->next; + return NGX_AGAIN; + } + + if (rc == NGX_DECLINED) { + s->phase_handler++; + return NGX_AGAIN; + } + + if (rc == NGX_DONE) { + return NGX_OK; + } + + if (rc == NGX_ERROR) { + rc = NGX_STREAM_INTERNAL_SERVER_ERROR; + } + + ngx_stream_finalize_session(s, rc); + + return NGX_OK; +} + + ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) @@ -303,6 +423,8 @@ ngx_stream_core_create_srv_conf(ngx_conf_t *cf) cscf->resolver_timeout = NGX_CONF_UNSET_MSEC; cscf->proxy_protocol_timeout = NGX_CONF_UNSET_MSEC; cscf->tcp_nodelay = NGX_CONF_UNSET; + cscf->preread_buffer_size = NGX_CONF_UNSET_SIZE; + cscf->preread_timeout = NGX_CONF_UNSET_MSEC; return cscf; } @@ -355,6 +477,12 @@ ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->tcp_nodelay, prev->tcp_nodelay, 1); + ngx_conf_merge_size_value(conf->preread_buffer_size, + prev->preread_buffer_size, 16384); + + ngx_conf_merge_msec_value(conf->preread_timeout, + prev->preread_timeout, 30000); + return NGX_CONF_OK; }