nginx-0.0.1-2003-04-22-19:02:58 import

This commit is contained in:
Igor Sysoev 2003-04-22 15:02:58 +00:00
parent b1ab5d0606
commit 8dcd23e837
4 changed files with 70 additions and 503 deletions

View File

@ -12,19 +12,15 @@
#include <ngx_event_timer.h>
#include <ngx_kqueue_module.h>
#if (USE_KQUEUE) && !(HAVE_KQUEUE)
#error "kqueue is not supported on this platform"
#endif
/* STUB */
#define KQUEUE_NCHANGES 512
#define KQUEUE_NEVENTS 512
/* should be per-thread */
/* should be per-thread if threads are used without thread pool */
#if 1
int kq;
int kq;
#else
static int kq;
#endif
@ -60,7 +56,6 @@ int ngx_kqueue_init(int max_connections, ngx_log_t *log)
return NGX_ERROR;
}
#if !(USE_KQUEUE)
ngx_event_actions.add = ngx_kqueue_add_event;
ngx_event_actions.del = ngx_kqueue_del_event;
ngx_event_actions.timer = ngx_event_add_timer;
@ -89,8 +84,6 @@ int ngx_kqueue_init(int max_connections, ngx_log_t *log)
ngx_write_chain_proc = ngx_freebsd_write_chain;
#endif
#endif
return NGX_OK;

View File

@ -146,31 +146,33 @@ typedef struct {
} ngx_event_actions_t;
/* Event filter requires to read/write the whole data -
/* The event filter requires to read/write the whole data -
select, poll, /dev/poll, kqueue. */
#define NGX_HAVE_LEVEL_EVENT 1
/* Event filter is deleted after notification - select, poll, kqueue.
Using /dev/poll it can be implemented with additional syscall */
/* The event filter is deleted after a notification without an additional
syscall - select, poll, kqueue. */
#define NGX_HAVE_ONESHOT_EVENT 2
/* Event filter notifies only changes and initial level - kqueue */
/* The event filter notifies only the changes and an initial level - kqueue */
#define NGX_HAVE_CLEAR_EVENT 4
/* Event filter has kqueue features - eof flag, errno, available data, etc */
/* The event filter has kqueue features - the eof flag, errno,
available data, etc */
#define NGX_HAVE_KQUEUE_EVENT 8
/* Event filter supports low water mark - kqueue's NOTE_LOWAT,
early kqueue implementations have no NOTE_LOWAT so we need a separate flag */
/* The event filter supports low water mark - kqueue's NOTE_LOWAT.
Early kqueue implementations have no NOTE_LOWAT so we need a separate flag */
#define NGX_HAVE_LOWAT_EVENT 0x00000010
/* Event filter notifies only changes (edges) but not initial level - epoll */
/* The event filter notifies only the changes (the edges)
but not an initial level - epoll */
#define NGX_HAVE_EDGE_EVENT 0x00000020
/* No need to add or delete event filters - rt signals */
/* No need to add or delete the event filters - rt signals */
#define NGX_HAVE_SIGIO_EVENT 0x00000040
/* No need to add or delete event filters - overlapped, aio_read, aioread */
/* No need to add or delete the event filters - overlapped, aio_read, aioread */
#define NGX_HAVE_AIO_EVENT 0x00000080
/* Need to add socket or handle only once - i/o completion port.
@ -284,6 +286,11 @@ extern int ngx_event_flags;
#endif
#if !(HAVE_EPOLL)
#define ngx_edge_add_event(ev) NGX_ERROR
#endif
ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size);
int ngx_event_close_connection(ngx_event_t *ev);

View File

@ -40,6 +40,27 @@ ngx_log_debug(p->log, "read upstream");
} else {
#if (HAVE_KQUEUE) /* kqueue notifies about the end of file or a pending error */
if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT) {
if (p->upstream->read->error) {
ngx_log_error(NGX_LOG_ERR, p->log,
p->upstream->read->error,
"readv() failed");
p->upstream_error = 1;
return NGX_ERROR;
} else if (p->upstream->read->eof
&& p->upstream->read->available == 0) {
p->upstream_eof = 1;
p->block_upstream = 0;
break;
}
}
#endif
/* use the free hunks if they exist */
if (p->free_hunks) {
@ -151,6 +172,7 @@ ngx_log_debug(p->log, "recv_chain: %d" _ n);
}
p->upstream_eof = 1;
p->block_upstream = 0;
break;
}
@ -399,7 +421,8 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
ngx_hunk_t *h;
ngx_chain_t *entry;
if (p->downstream_level == 0
if (p->upstream_level == 0
&& p->downstream_level == 0
&& p->busy_hunk == NULL
&& p->out_hunks == NULL
&& p->in_hunks == NULL

View File

@ -332,7 +332,8 @@ static int ngx_http_proxy_process_upstream(ngx_http_proxy_ctx_t *p,
if (c) {
p->cached_connection = 1;
p->connection = c;
c->write->event_handler = ngx_http_proxy_process_upstream_event;
c->write->event_handler = c->read->event_handler =
ngx_http_proxy_process_upstream_event;
rc = ngx_http_proxy_send_request(p);
} else {
@ -370,10 +371,13 @@ static int ngx_http_proxy_process_upstream(ngx_http_proxy_ctx_t *p,
return NGX_DONE;
}
if (rc == NGX_HTTP_BAD_GATEWAY || rc == NGX_HTTP_GATEWAY_TIME_OUT
|| (rc == NGX_OK
&& p->status == NGX_HTTP_INTERNAL_SERVER_ERROR
&& p->lcf->retry_500_error))
if (p->tries /* STUB !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! */
&& (rc == NGX_HTTP_BAD_GATEWAY
|| rc == NGX_HTTP_GATEWAY_TIME_OUT
|| (rc == NGX_OK
&& p->status == NGX_HTTP_INTERNAL_SERVER_ERROR
&& p->lcf->retry_500_error)))
{
if (ev) {
ngx_event_close_connection(ev);
@ -403,6 +407,8 @@ static int ngx_http_proxy_process_upstream(ngx_http_proxy_ctx_t *p,
return NGX_ERROR;
}
/* reinitialize the proxy context for the next upstream */
p->headers_in.server->key.len = 0;
p->headers_in.connection->key.len = 0;
p->headers_in.content_type->key.len = 0;
@ -441,8 +447,8 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
if (p->tries == p->upstreams->number) {
/* Here is the race condition
when the upstreams are shared between threads or processes
but it should not be serious */
when the upstreams are shared between
the threads or the processes but it should not be serious */
p->cur_upstream = p->upstreams->current++;
@ -465,9 +471,10 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
for ( ;; ) {
u = &p->upstreams->u[p->cur_upstream];
/* Here is the race condition
when the upstreams are shared between threads or processes
but it should not be serious */
when the upstreams are shared between
the threads or the processes but it should not be serious */
if (u->fails > p->upstreams->max_fails
|| u->accessed < p->upstreams->fail_timeout)
@ -502,7 +509,7 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
if (s == -1) {
ngx_log_error(NGX_LOG_ALERT, p->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (p->lcf->rcvbuf) {
@ -516,7 +523,7 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
ngx_close_socket_n " failed");
}
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
@ -529,7 +536,7 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
ngx_close_socket_n " failed");
}
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
c = &ngx_connections[s];
@ -549,26 +556,18 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
c->fd = s;
wev->close_handler = rev->close_handler = ngx_event_close_connection;
#if !(USE_KQUEUE)
#if (HAVE_EDGE_EVENT) /* epoll */
if (ngx_event_flags & NGX_HAVE_EDGE_EVENT) {
if (ngx_edge_add_event(wev) != NGX_OK) {
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
#endif
#endif
ngx_test_null(c->pool, ngx_create_pool(p->lcf->conn_pool_size, p->log),
NGX_ERROR);
NGX_HTTP_INTERNAL_SERVER_ERROR);
ngx_test_null(p->sockaddr,
ngx_pcalloc(c->pool, sizeof(struct sockaddr_in)),
NGX_ERROR);
NGX_HTTP_INTERNAL_SERVER_ERROR);
addr = (struct sockaddr_in *) p->sockaddr;
@ -588,8 +587,6 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
ngx_close_socket_n " failed");
}
ngx_destroy_pool(c->pool);
return NGX_HTTP_BAD_GATEWAY;
}
}
@ -597,35 +594,22 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
c->data = p->request;
p->connection = c;
ngx_test_null(c->pool, ngx_create_pool(p->lcf->conn_pool_size, p->log),
NGX_ERROR);
if ((ngx_event_flags & NGX_HAVE_EDGE_EVENT) == 0) { /* not epoll */
#if (USE_KQUEUE)
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
return NGX_ERROR;
}
#else
if ((ngx_event_flags & NGX_HAVE_EDGE_EVENT) == 0) { /* not epoll */
if (ngx_event_flags & NGX_HAVE_CLEAR_EVENT) { /* kqueue */
if (ngx_event_flags & NGX_HAVE_CLEAR_EVENT) { /* kqueue */
event = NGX_CLEAR_EVENT;
} else { /* select, poll, /dev/poll */
} else { /* select, poll, /dev/poll */
event = NGX_LEVEL_EVENT;
}
/* TODO: aio, iocp */
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
#endif /* USE_KQUEUE */
wev->event_handler = rev->event_handler =
ngx_http_proxy_process_upstream_event;
@ -645,14 +629,6 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
wev->timer_set = 1;
ngx_add_timer(wev, p->lcf->connect_timeout);
#if (USE_KQUEUE)
if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
return NGX_ERROR;
}
#else
/* TODO: aio, iocp */
if (ngx_event_flags & NGX_HAVE_EDGE_EVENT) {
@ -660,11 +636,9 @@ static int ngx_http_proxy_connect(ngx_http_proxy_ctx_t *p)
}
if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
return NGX_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
#endif /* USE_KQUEUE */
return NGX_DONE;
}
@ -1123,436 +1097,6 @@ static int ngx_http_proxy_write_upstream_body(ngx_http_proxy_ctx_t *p)
}
#if 0
static int ngx_http_proxy_read_upstream_body(ngx_event_t *rev)
{
int rc, n, size, eof;
ngx_hunk_t *h;
ngx_chain_t *chain, *ce, *tce;
ngx_event_t *wev;
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_proxy_ctx_t *p;
c = (ngx_connection_t *) rev->data;
r = (ngx_http_request_t *) c->data;
p = (ngx_http_proxy_ctx_t *)
ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx);
eof = 0;
for ( ;; ) {
#if (USE_KQUEUE)
if (ev->eof && ev->available == 0) {
eof = 1;
break;
}
#elif (HAVE_KQUEUE0)
if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT
&& ev->eof && ev->available == 0)
{
eof = 1;
break;
}
#endif
/* use the free hunks if they exist */
if (p->free_hunks) {
chain = p->free_hunks;
p->free_hunks = NULL;
/* allocate a new hunk if it's still allowed */
} else if (p->allocated < p->lcf->max_block_size) {
ngx_test_null(h,
ngx_create_temp_hunk(r->pool, p->block_size, 50, 50),
NGX_ERROR);
p->allocated += p->block_size;
ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR);
tce->hunk = h;
tce->next = NULL;
chain = tce;
/* use the shadow hunks if they exist */
} else if (p->shadow_hunks) {
chain = p->shadow_hunks;
p->shadow_hunks = NULL;
/* write all the incoming hunks or the first hunk only
to a temporary file and convert them to the shadow hunks */
} else {
if (r->cachable) {
rc = ngx_http_proxy_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
return rc;
}
} else {
tce = p->in_hunks->next;
p->in_hunks->next = NULL;
rc = ngx_http_proxy_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
p->in_hunks = tce;
return rc;
}
p->in_hunks = tce;
}
}
n = ngx_recv_chain(c, chain);
if (n == NGX_ERROR) {
return NGX_ERROR;
}
if (n == NGX_AGAIN) {
return NGX_AGAIN;
}
if (n == 0) {
eof = 1;
break;
}
for (ce = chain; ce && n > 0; ce = ce->next) {
ngx_test_null(tce, ngx_create_chain_entry(r->pool), NGX_ERROR);
tce->hunk = ce->hunk;
tce->next = NULL;
if (p->last_in_hunk) {
p->last_in_hunk->next = tce;
p->last_in_hunk = tce;
} else {
p->last_in_hunk = tce;
}
size = ce->hunk->end - ce->hunk->last;
if (n >= size) {
n -= size;
ce->hunk->last = ce->hunk->end;
if (ce->hunk->shadow) {
ce->hunk->shadow->type &= ~(NGX_HUNK_TEMP
|NGX_HUNK_IN_MEMORY
|NGX_HUNK_RECYCLED);
ce->hunk->shadow->shadow = NULL;
}
continue;
}
ce->hunk->last += n;
if (ce->hunk->shadow) {
ce->hunk->shadow->type &= ~(NGX_HUNK_TEMP
|NGX_HUNK_IN_MEMORY
|NGX_HUNK_RECYCLED);
ce->hunk->shadow->shadow = NULL;
}
break;
}
if (ce) {
ce->next = p->free_hunks;
p->free_hunks = ce;
break;
}
}
wev = p->request->connection->write;
if (r->cachable) {
if (p->in_hunks) {
rc = ngx_http_proxy_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
return rc;
}
}
if (p->out_hunks && wev->ready) {
return ngx_http_proxy_write_upstream_body(wev);
}
} else {
if ((p->out_hunks || p->in_hunks) && wev->ready) {
return ngx_http_proxy_write_upstream_body(wev);
}
}
return NGX_OK;
}
static int ngx_http_proxy_write_chain_to_temp_file(ngx_http_proxy_ctx_t *p)
{
int i, rc;
ngx_hunk_t *h;
ngx_chain_t *ce, *tce;
if (p->temp_file->fd == NGX_INVALID_FILE) {
rc = ngx_create_temp_file(p->temp_file, p->lcf->temp_path,
p->request->pool,
0, 2,
p->request->cachable);
if (rc != NGX_OK) {
return rc;
}
if (p->lcf->temp_file_warn) {
ngx_log_error(NGX_LOG_WARN, p->log, 0,
"an upstream response is buffered "
"to a temporary file");
}
}
if (ngx_write_chain_to_file(p->temp_file, p->in_hunks,
p->temp_offset, p->request->pool) == NGX_ERROR) {
return NGX_ERROR;
}
for (ce = p->in_hunks; ce; ce = ce->next) {
ngx_test_null(h, ngx_pcalloc(p->request->pool, sizeof(ngx_hunk_t)),
NGX_ERROR);
h->type = NGX_HUNK_FILE
|NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED;
ce->hunk->shadow = h;
h->shadow = ce->hunk;
h->file_pos = p->temp_offset;
p->temp_offset += ce->hunk->last - ce->hunk->pos;
h->file_last = p->temp_offset;
h->file->fd = p->temp_file->fd;
h->file->log = p->log;
h->pos = ce->hunk->pos;
h->last = ce->hunk->last;
h->start = ce->hunk->start;
h->end = ce->hunk->end;
h->pre_start = ce->hunk->pre_start;
h->post_end = ce->hunk->post_end;
ngx_test_null(tce, ngx_create_chain_entry(p->request->pool), NGX_ERROR);
tce->hunk = h;
tce->next = NULL;
if (p->last_out_hunk) {
p->last_out_hunk->next = tce;
p->last_out_hunk = tce;
} else {
p->last_out_hunk = tce;
}
}
return NGX_OK;
}
static int ngx_http_proxy_write_upstream_body(ngx_event_t *wev)
{
int rc;
ngx_hunk_t *h, *sh;
ngx_chain_t *ce;
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_proxy_ctx_t *p;
c = (ngx_connection_t *) wev->data;
r = (ngx_http_request_t *) c->data;
p = (ngx_http_proxy_ctx_t *)
ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx);
while (p->out_hunks) {
h = p->out_hunks->hunk;
rc = ngx_http_output_filter(r, h);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}
if (rc == NGX_AGAIN || h->pos < h->last) {
return NGX_AGAIN;
}
p->out_hunks = p->out_hunks->next;
/* if the complete hunk has a shadow hunk
then add a shadow hunk to p->free_hunks chain */
sh = h->shadow;
if (sh) {
sh->pos = sh->last = sh->start;
ngx_test_null(ce, ngx_create_chain_entry(r->pool), NGX_ERROR);
ce->hunk = sh;
ce->next = p->free_hunks;
p->free_hunks = ce;
}
}
return NGX_OK;
}
#endif
static int ngx_http_proxy_read_response_body(ngx_event_t *ev)
{
int n;
char *buf;
size_t left, size;
ngx_hunk_t *h, **ph;
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_proxy_ctx_t *p;
if (ev->timedout) {
return NGX_ERROR;
}
c = (ngx_connection_t *) ev->data;
r = (ngx_http_request_t *) c->data;
p = (ngx_http_proxy_ctx_t *)
ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx);
if (p->hunks.nelts > 0) {
h = ((ngx_hunk_t **) p->hunks.elts)[p->hunks.nelts - 1];
left = h->end - h->last;
} else {
h = NULL;
left = 0;
}
do {
#if (USE_KQUEUE)
/* do not allocate new block if there is EOF */
if (ev->eof && ev->available == 0) {
left = 1;
}
#elif (HAVE_KQUEUE)
if (ngx_event_type == NGX_HAVE_KQUEUE_EVENT) {
/* do not allocate new block if there is EOF */
if (ev->eof && ev->available == 0) {
left = 1;
}
}
#endif
if (left == 0) {
ngx_test_null(ph, ngx_push_array(&p->hunks), NGX_ERROR);
ngx_test_null(h,
ngx_create_temp_hunk(r->pool,
/* STUB */ 4096 /**/, 0, 0),
NGX_ERROR);
h->type = NGX_HUNK_MEMORY|NGX_HUNK_IN_MEMORY;
*ph = h;
}
if (h != NULL) {
buf = h->last;
size = h->end - h->last;
} else {
buf = (char *) &buf;
size = 0;
}
n = ngx_event_recv(c, buf, size);
ngx_log_debug(c->log, "READ:%d" _ n);
if (n == NGX_AGAIN) {
return NGX_DONE;
}
if (n == NGX_ERROR) {
return NGX_ERROR;
}
h->last += n;
left = h->end - h->last;
/* STUB */
*h->last = '\0';
ngx_log_debug(c->log, "PROXY:\n'%s'" _ h->pos);
/**/
} while (n > 0 && left == 0);
if (n == 0) {
ngx_log_debug(c->log, "CLOSE proxy");
#if 0
ngx_del_event(ev, NGX_READ_EVENT, NGX_CLOSE_EVENT);
#endif
ngx_event_close_connection(ev);
p->hunk_n = 0;
c->write->event_handler = ngx_http_proxy_write_to_client;
return ngx_http_proxy_write_to_client(c->write);
}
/* STUB */ return NGX_DONE;
}
static int ngx_http_proxy_write_to_client(ngx_event_t *ev)
{
int rc;
ngx_hunk_t *h;
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_proxy_ctx_t *p;
c = (ngx_connection_t *) ev->data;
r = (ngx_http_request_t *) c->data;
p = (ngx_http_proxy_ctx_t *)
ngx_http_get_module_ctx(r, ngx_http_proxy_module_ctx);
do {
h = ((ngx_hunk_t **) p->hunks.elts)[p->hunk_n];
rc = ngx_http_output_filter(r, h);
if (rc != NGX_OK) {
return rc;
}
if (p->hunk_n >= p->hunks.nelts) {
break;
}
p->hunk_n++;
} while (rc == NGX_OK);
return NGX_OK;
}
static int ngx_http_proxy_finalize_request(ngx_http_proxy_ctx_t *p, int error)