diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c index f0b79341d..2ef39af24 100644 --- a/src/stream/ngx_stream_core_module.c +++ b/src/stream/ngx_stream_core_module.c @@ -10,6 +10,11 @@ #include +static ngx_uint_t ngx_stream_preread_can_peek(ngx_connection_t *c); +static ngx_int_t ngx_stream_preread_peek(ngx_stream_session_t *s, + ngx_stream_phase_handler_t *ph); +static ngx_int_t ngx_stream_preread(ngx_stream_session_t *s, + ngx_stream_phase_handler_t *ph); static ngx_int_t ngx_stream_core_preconfiguration(ngx_conf_t *cf); static void *ngx_stream_core_create_main_conf(ngx_conf_t *cf); static char *ngx_stream_core_init_main_conf(ngx_conf_t *cf, void *conf); @@ -203,8 +208,6 @@ ngx_int_t ngx_stream_core_preread_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) { - size_t size; - ssize_t n; ngx_int_t rc; ngx_connection_t *c; ngx_stream_core_srv_conf_t *cscf; @@ -217,56 +220,33 @@ ngx_stream_core_preread_phase(ngx_stream_session_t *s, if (c->read->timedout) { rc = NGX_STREAM_OK; + goto done; + } - } else if (c->read->timer_set) { - rc = NGX_AGAIN; + if (!c->read->timer_set) { + rc = ph->handler(s); + + if (rc != NGX_AGAIN) { + goto done; + } + } + + if (c->buffer == NULL) { + c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size); + if (c->buffer == NULL) { + rc = NGX_ERROR; + goto done; + } + } + + if (ngx_stream_preread_can_peek(c)) { + rc = ngx_stream_preread_peek(s, ph); } else { - rc = ph->handler(s); + rc = ngx_stream_preread(s, ph); } - while (rc == NGX_AGAIN) { - - if (c->buffer == NULL) { - c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size); - if (c->buffer == NULL) { - rc = NGX_ERROR; - break; - } - } - - size = c->buffer->end - c->buffer->last; - - if (size == 0) { - ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full"); - rc = NGX_STREAM_BAD_REQUEST; - break; - } - - if (c->read->eof) { - rc = NGX_STREAM_OK; - break; - } - - if (!c->read->ready) { - break; - } - - n = c->recv(c, c->buffer->last, size); - - if (n == NGX_ERROR || n == 0) { - rc = NGX_STREAM_OK; - break; - } - - if (n == NGX_AGAIN) { - break; - } - - c->buffer->last += n; - - rc = ph->handler(s); - } +done: if (rc == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { @@ -311,6 +291,129 @@ ngx_stream_core_preread_phase(ngx_stream_session_t *s, } +static ngx_uint_t +ngx_stream_preread_can_peek(ngx_connection_t *c) +{ +#if (NGX_STREAM_SSL) + if (c->ssl) { + return 0; + } +#endif + + if ((ngx_event_flags & NGX_USE_CLEAR_EVENT) == 0) { + return 0; + } + +#if (NGX_HAVE_KQUEUE) + if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { + return 1; + } +#endif + +#if (NGX_HAVE_EPOLLRDHUP) + if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ngx_use_epoll_rdhup) { + return 1; + } +#endif + + return 0; +} + + +static ngx_int_t +ngx_stream_preread_peek(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) +{ + ssize_t n; + ngx_int_t rc; + ngx_err_t err; + ngx_connection_t *c; + + c = s->connection; + + n = recv(c->fd, (char *) c->buffer->last, + c->buffer->end - c->buffer->last, MSG_PEEK); + + err = ngx_socket_errno; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream recv(): %z", n); + + if (n == -1) { + if (err == NGX_EAGAIN) { + c->read->ready = 0; + return NGX_AGAIN; + } + + ngx_connection_error(c, err, "recv() failed"); + return NGX_STREAM_OK; + } + + if (n == 0) { + return NGX_STREAM_OK; + } + + c->buffer->last += n; + + rc = ph->handler(s); + + if (rc != NGX_AGAIN) { + c->buffer->last = c->buffer->pos; + return rc; + } + + if (c->buffer->last == c->buffer->end) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full"); + return NGX_STREAM_BAD_REQUEST; + } + + if (c->read->pending_eof) { + return NGX_STREAM_OK; + } + + c->buffer->last = c->buffer->pos; + + return NGX_AGAIN; +} + + +static ngx_int_t +ngx_stream_preread(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph) +{ + ssize_t n; + ngx_int_t rc; + ngx_connection_t *c; + + c = s->connection; + + while (c->read->ready) { + + n = c->recv(c, c->buffer->last, c->buffer->end - c->buffer->last); + + if (n == NGX_AGAIN) { + return NGX_AGAIN; + } + + if (n == NGX_ERROR || n == 0) { + return NGX_STREAM_OK; + } + + c->buffer->last += n; + + rc = ph->handler(s); + + if (rc != NGX_AGAIN) { + return rc; + } + + if (c->buffer->last == c->buffer->end) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full"); + return NGX_STREAM_BAD_REQUEST; + } + } + + return NGX_AGAIN; +} + + ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph)