Welcome! Log In Create A New Profile

Advanced

upstream keepalive: call for testing

Maxim Dounin
July 26, 2011 07:58AM
Hello!

Attached patch (against 1.0.5) introduces upstream keepalive
support for memcached, fastcgi and http. Note the patch is
experimental and may have problems (though it passes basic smoke
tests here). Testing is appreciated.

Major changes include:

1. Content length now parsed into u->headers_in.content_length_n
instead of directly to r->headers_out.content_length_n.
Note this may break 3rd party protocol modules.

2. Use off_t for r->upstream->length.
Note this may break 3rd party protocol modules.

3. In buffered mode u->pipe->length was introduced to indicate minimal amount
of data which must be passed to input filter. This allows to not rely on
connection close to indicate response end while still effectiently using
buffers in most cases. Defaults to -1 (that is, wait for
connection close) if not set by protocol-specific handlers.

4. In buffered mode u->input_filter_init() now called if it's set.
This is used to set initial value of u->pipe->length (see above).

5. Proxy module now able to talk HTTP/1.1, in particular it understands
chunked encoding in responses. Requests are sent using HTTP/1.1
if proxy_http_version directive is set to 1.1.

6. Introduced u->keepalive flag to indicate connection to upstream is in
correct state and may be kept alive. Memcached, fastcgi and proxy
modules are updated to set it.

Patch is expected to be used with upstream keepalive module[1] compiled with
NGX_UPSTREAM_KEEPALIVE_PATCHED defined, i.e use something like this:

./configure --with-cc-opt="-D NGX_UPSTREAM_KEEPALIVE_PATCHED" \
--add-module=/path/to/ngx_http_upstream_keepalive

And use something like this in config:

upstream memcached_backend {
server 127.0.0.1:11211;
keepalive 1;
}

upstream http_backend {
server 127.0.0.1:8080;
keepalive 1;
}

upstream fastcgi_backend {
server 127.0.0.1:9000;
keepalive 1;
}

server {
...

location /memcached/ {
set $memcached_key $uri;
memcached_pass memcached_backend;
}

location /fastcgi/ {
fastcgi_pass fastcgi_backend;
...
}

location /http/ {
proxy_pass http://http_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
...
}
}

[1] http://mdounin.ru/hg/ngx_http_upstream_keepalive/

Maxim Dounin
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -392,8 +392,30 @@ ngx_event_pipe_read_upstream(ngx_event_p
cl->buf->file_last - cl->buf->file_pos);
}

+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe length: %O", p->length);
+
#endif

+ if (p->free_raw_bufs && p->length != -1) {
+ cl = p->free_raw_bufs;
+
+ if (cl->buf->last - cl->buf->pos >= p->length) {
+
+ /* STUB */ cl->buf->num = p->num++;
+
+ if (p->input_filter(p, cl->buf) == NGX_ERROR) {
+ return NGX_ABORT;
+ }
+
+ p->free_raw_bufs = cl->next;
+ }
+ }
+
+ if (p->length == 0) {
+ p->upstream_done = 1;
+ }
+
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {

/* STUB */ p->free_raw_bufs->buf->num = p->num++;
@@ -848,6 +870,12 @@ ngx_event_pipe_copy_input_filter(ngx_eve
}
p->last_in = &cl->next;

+ if (p->length == -1) {
+ return NGX_OK;
+ }
+
+ p->length -= b->last - b->pos;
+
return NGX_OK;
}

diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -65,6 +65,7 @@ struct ngx_event_pipe_s {
ssize_t busy_size;

off_t read_length;
+ off_t length;

off_t max_temp_file_size;
ssize_t temp_file_write_size;
diff --git a/src/http/modules/ngx_http_fastcgi_module.c b/src/http/modules/ngx_http_fastcgi_module.c
--- a/src/http/modules/ngx_http_fastcgi_module.c
+++ b/src/http/modules/ngx_http_fastcgi_module.c
@@ -77,6 +77,8 @@ typedef struct {

#define NGX_HTTP_FASTCGI_RESPONDER 1

+#define NGX_HTTP_FASTCGI_KEEP_CONN 1
+
#define NGX_HTTP_FASTCGI_BEGIN_REQUEST 1
#define NGX_HTTP_FASTCGI_ABORT_REQUEST 2
#define NGX_HTTP_FASTCGI_END_REQUEST 3
@@ -130,6 +132,7 @@ static ngx_int_t ngx_http_fastcgi_create
static ngx_int_t ngx_http_fastcgi_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_fastcgi_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_fastcgi_input_filter_init(void *data);
static ngx_int_t ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p,
ngx_buf_t *buf);
static ngx_int_t ngx_http_fastcgi_process_record(ngx_http_request_t *r,
@@ -484,7 +487,7 @@ static ngx_http_fastcgi_request_start_t

{ 0, /* role_hi */
NGX_HTTP_FASTCGI_RESPONDER, /* role_lo */
- 0, /* NGX_HTTP_FASTCGI_KEEP_CONN */ /* flags */
+ NGX_HTTP_FASTCGI_KEEP_CONN, /* flags */
{ 0, 0, 0, 0, 0 } }, /* reserved[5] */

{ 1, /* version */
@@ -600,6 +603,8 @@ ngx_http_fastcgi_handler(ngx_http_reques
u->pipe->input_filter = ngx_http_fastcgi_input_filter;
u->pipe->input_ctx = r;

+ u->input_filter_init = ngx_http_fastcgi_input_filter_init;
+
rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);

if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
@@ -1566,6 +1571,17 @@ ngx_http_fastcgi_process_header(ngx_http


static ngx_int_t
+ngx_http_fastcgi_input_filter_init(void *data)
+{
+ ngx_http_request_t *r = data;
+
+ r->upstream->pipe->length = sizeof(ngx_http_fastcgi_header_t);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
{
u_char *m, *msg;
@@ -1603,7 +1619,6 @@ ngx_http_fastcgi_input_filter(ngx_event_

if (f->type == NGX_HTTP_FASTCGI_STDOUT && f->length == 0) {
f->state = ngx_http_fastcgi_st_version;
- p->upstream_done = 1;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
"http fastcgi closed stdout");
@@ -1614,6 +1629,7 @@ ngx_http_fastcgi_input_filter(ngx_event_
if (f->type == NGX_HTTP_FASTCGI_END_REQUEST) {
f->state = ngx_http_fastcgi_st_version;
p->upstream_done = 1;
+ r->upstream->keepalive = 1;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0,
"http fastcgi sent end request");
@@ -1773,6 +1789,20 @@ ngx_http_fastcgi_input_filter(ngx_event_

}

+ /* set p->length, minimal amount of data we want to see */
+
+ if (f->state < ngx_http_fastcgi_st_data) {
+ p->length = 1;
+
+ } else if (f->state == ngx_http_fastcgi_st_padding) {
+ p->length = f->padding;
+
+ } else {
+ /* ngx_http_fastcgi_st_data */
+
+ p->length = f->length;
+ }
+
if (b) {
b->shadow = buf;
b->last_shadow = 1;
diff --git a/src/http/modules/ngx_http_memcached_module.c b/src/http/modules/ngx_http_memcached_module.c
--- a/src/http/modules/ngx_http_memcached_module.c
+++ b/src/http/modules/ngx_http_memcached_module.c
@@ -344,8 +344,8 @@ found:

while (*p && *p++ != CR) { /* void */ }

- r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
- if (r->headers_out.content_length_n == -1) {
+ u->headers_in.content_length_n = ngx_atoof(len, p - len - 1);
+ if (u->headers_in.content_length_n == -1) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"memcached sent invalid length in response \"%V\" "
"for key \"%V\"",
@@ -366,6 +366,7 @@ found:

u->headers_in.status_n = 404;
u->state->status = 404;
+ u->keepalive = 1;

return NGX_OK;
}
@@ -426,6 +427,10 @@ ngx_http_memcached_filter(void *data, ss
u->length -= bytes;
ctx->rest -= bytes;

+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}

@@ -463,6 +468,13 @@ ngx_http_memcached_filter(void *data, ss
if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"memcached sent invalid trailer");
+
+ b->last = last;
+ cl->buf->last = last;
+ u->length = 0;
+ ctx->rest = 0;
+
+ return NGX_OK;
}

ctx->rest -= b->last - last;
@@ -470,6 +482,10 @@ ngx_http_memcached_filter(void *data, ss
cl->buf->last = last;
u->length = ctx->rest;

+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}

diff --git a/src/http/modules/ngx_http_proxy_module.c b/src/http/modules/ngx_http_proxy_module.c
--- a/src/http/modules/ngx_http_proxy_module.c
+++ b/src/http/modules/ngx_http_proxy_module.c
@@ -71,6 +71,8 @@ typedef struct {

ngx_flag_t redirect;

+ ngx_uint_t http_version;
+
ngx_uint_t headers_hash_max_size;
ngx_uint_t headers_hash_bucket_size;
} ngx_http_proxy_loc_conf_t;
@@ -80,6 +82,12 @@ typedef struct {
ngx_http_status_t status;
ngx_http_proxy_vars_t vars;
size_t internal_body_length;
+
+ ngx_uint_t state;
+ off_t size;
+ off_t length;
+
+ ngx_uint_t head; /* unsigned head:1 */
} ngx_http_proxy_ctx_t;


@@ -92,6 +100,15 @@ static ngx_int_t ngx_http_proxy_create_r
static ngx_int_t ngx_http_proxy_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r);
static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r);
+static ngx_int_t ngx_http_proxy_input_filter_init(void *data);
+static ngx_int_t ngx_http_proxy_copy_filter(ngx_event_pipe_t *p,
+ ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p,
+ ngx_buf_t *buf);
+static ngx_int_t ngx_http_proxy_non_buffered_copy_filter(void *data,
+ ssize_t bytes);
+static ngx_int_t ngx_http_proxy_non_buffered_chunked_filter(void *data,
+ ssize_t bytes);
static void ngx_http_proxy_abort_request(ngx_http_request_t *r);
static void ngx_http_proxy_finalize_request(ngx_http_request_t *r,
ngx_int_t rc);
@@ -157,6 +174,13 @@ static ngx_conf_bitmask_t ngx_http_prox
};


+static ngx_conf_enum_t ngx_http_proxy_http_version[] = {
+ { ngx_string("1.0"), NGX_HTTP_VERSION_10 },
+ { ngx_string("1.1"), NGX_HTTP_VERSION_11 },
+ { ngx_null_string, 0 }
+};
+
+
ngx_module_t ngx_http_proxy_module;


@@ -432,6 +456,13 @@ static ngx_command_t ngx_http_proxy_com
offsetof(ngx_http_proxy_loc_conf_t, upstream.ignore_headers),
&ngx_http_upstream_ignore_headers_masks },

+ { ngx_string("proxy_http_version"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_enum_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_proxy_loc_conf_t, http_version),
+ &ngx_http_proxy_http_version },
+
#if (NGX_HTTP_SSL)

{ ngx_string("proxy_ssl_session_reuse"),
@@ -479,6 +510,7 @@ ngx_module_t ngx_http_proxy_module = {


static char ngx_http_proxy_version[] = " HTTP/1.0" CRLF;
+static char ngx_http_proxy_version_11[] = " HTTP/1.1" CRLF;


static ngx_keyval_t ngx_http_proxy_headers[] = {
@@ -486,6 +518,7 @@ static ngx_keyval_t ngx_http_proxy_head
{ ngx_string("Connection"), ngx_string("close") },
{ ngx_string("Keep-Alive"), ngx_string("") },
{ ngx_string("Expect"), ngx_string("") },
+ { ngx_string("Upgrade"), ngx_string("") },
{ ngx_null_string, ngx_null_string }
};

@@ -610,7 +643,12 @@ ngx_http_proxy_handler(ngx_http_request_
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

- u->pipe->input_filter = ngx_event_pipe_copy_input_filter;
+ u->pipe->input_filter = ngx_http_proxy_copy_filter;
+ u->pipe->input_ctx = r;
+
+ u->input_filter_init = ngx_http_proxy_input_filter_init;
+ u->input_filter = ngx_http_proxy_non_buffered_copy_filter;
+ u->input_filter_ctx = r;

u->accel = 1;

@@ -864,14 +902,20 @@ ngx_http_proxy_create_request(ngx_http_r
method.len++;
}

+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ if (method.len == 5
+ && ngx_strncasecmp(method.data, (u_char *) "HEAD ", 5) == 0)
+ {
+ ctx->head = 1;
+ }
+
len = method.len + sizeof(ngx_http_proxy_version) - 1 + sizeof(CRLF) - 1;

escape = 0;
loc_len = 0;
unparsed_uri = 0;

- ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
-
if (plcf->proxy_lengths) {
uri_len = ctx->vars.uri.len;

@@ -1007,8 +1051,14 @@ ngx_http_proxy_create_request(ngx_http_r

u->uri.len = b->last - u->uri.data;

- b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
- sizeof(ngx_http_proxy_version) - 1);
+ if (plcf->http_version == NGX_HTTP_VERSION_11) {
+ b->last = ngx_cpymem(b->last, ngx_http_proxy_version_11,
+ sizeof(ngx_http_proxy_version_11) - 1);
+
+ } else {
+ b->last = ngx_cpymem(b->last, ngx_http_proxy_version,
+ sizeof(ngx_http_proxy_version) - 1);
+ }

ngx_memzero(&e, sizeof(ngx_http_script_engine_t));

@@ -1157,8 +1207,11 @@ ngx_http_proxy_reinit_request(ngx_http_r
ctx->status.count = 0;
ctx->status.start = NULL;
ctx->status.end = NULL;
+ ctx->state = 0;

r->upstream->process_header = ngx_http_proxy_process_status_line;
+ r->upstream->pipe->input_filter = ngx_http_proxy_copy_filter;
+ r->upstream->input_filter = ngx_http_proxy_non_buffered_copy_filter;
r->state = 0;

return NGX_OK;
@@ -1244,6 +1297,8 @@ ngx_http_proxy_process_header(ngx_http_r
{
ngx_int_t rc;
ngx_table_elt_t *h;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
ngx_http_upstream_header_t *hh;
ngx_http_upstream_main_conf_t *umcf;

@@ -1339,6 +1394,23 @@ ngx_http_proxy_process_header(ngx_http_r
h->lowcase_key = (u_char *) "date";
}

+ /*
+ * set u->keepalive if response has no body; this allows to keep
+ * connections alive in case of r->header_only or X-Accel-Redirect
+ */
+
+ u = r->upstream;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+ || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+ || ctx->head
+ || (!u->headers_in.chunked
+ && u->headers_in.content_length_n == 0))
+ {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}

@@ -1356,6 +1428,690 @@ ngx_http_proxy_process_header(ngx_http_r
}


+static ngx_int_t
+ngx_http_proxy_input_filter_init(void *data)
+{
+ ngx_http_request_t *r = data;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
+
+ u = r->upstream;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy filter init s:%d h:%d c:%d l:%O",
+ u->headers_in.status_n, ctx->head, u->headers_in.chunked,
+ u->headers_in.content_length_n);
+
+ /* as per RFC2616, 4.4 Message Length */
+
+ if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT
+ || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED
+ || ctx->head)
+ {
+ /* 1xx, 204, and 304 and replies to HEAD requests */
+ /* no 1xx since we don't send Expect and Upgrade */
+
+ u->pipe->length = 0;
+ u->length = 0;
+ u->keepalive = 1;
+
+ } else if (u->headers_in.chunked) {
+ /* chunked */
+
+ u->pipe->input_filter = ngx_http_proxy_chunked_filter;
+ u->pipe->length = 3; /* "0" LF LF */
+
+ u->input_filter = ngx_http_proxy_non_buffered_chunked_filter;
+ u->length = -1;
+
+ } else if (u->headers_in.content_length_n == 0) {
+ /* empty body: special case as filter won't be called */
+
+ u->pipe->length = 0;
+ u->length = 0;
+ u->keepalive = 1;
+
+ } else {
+ /* content length or connection close */
+
+ u->pipe->length = u->headers_in.content_length_n;
+ u->length = u->headers_in.content_length_n;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_copy_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+ ngx_buf_t *b;
+ ngx_chain_t *cl;
+ ngx_http_request_t *r;
+
+ if (buf->pos == buf->last) {
+ return NGX_OK;
+ }
+
+ if (p->free) {
+ cl = p->free;
+ b = cl->buf;
+ p->free = cl->next;
+ ngx_free_chain(p->pool, cl);
+
+ } else {
+ b = ngx_alloc_buf(p->pool);
+ if (b == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ ngx_memcpy(b, buf, sizeof(ngx_buf_t));
+ b->shadow = buf;
+ b->tag = p->tag;
+ b->last_shadow = 1;
+ b->recycled = 1;
+ buf->shadow = b;
+
+ cl = ngx_alloc_chain_link(p->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = b;
+ cl->next = NULL;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
+
+ if (p->in) {
+ *p->last_in = cl;
+ } else {
+ p->in = cl;
+ }
+ p->last_in = &cl->next;
+
+ if (p->length == -1) {
+ return NGX_OK;
+ }
+
+ p->length -= b->last - b->pos;
+
+ if (p->length == 0) {
+ r = p->input_ctx;
+ p->upstream_done = 1;
+ r->upstream->keepalive = 1;
+
+ } else if (p->length < 0) {
+ r = p->input_ctx;
+ p->upstream_done = 1;
+
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "upstream sent too many data");
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_inline ngx_int_t
+ngx_http_proxy_parse_chunked(ngx_http_request_t *r, ngx_buf_t *buf)
+{
+ u_char *pos, ch, c;
+ ngx_int_t rc;
+ ngx_http_proxy_ctx_t *ctx;
+ enum {
+ sw_chunk_start = 0,
+ sw_chunk_size,
+ sw_chunk_extension,
+ sw_chunk_extension_almost_done,
+ sw_chunk_data,
+ sw_after_data,
+ sw_after_data_almost_done,
+ sw_last_chunk_extension,
+ sw_last_chunk_extension_almost_done,
+ sw_trailer,
+ sw_trailer_almost_done,
+ sw_trailer_header,
+ sw_trailer_header_almost_done
+ } state;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+ state = ctx->state;
+
+ if (state == sw_chunk_data && ctx->size == 0) {
+ state = sw_after_data;
+ }
+
+ rc = NGX_AGAIN;
+
+ for (pos = buf->pos; pos < buf->last; pos++) {
+
+ ch = *pos;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy chunked byte: %02Xd s:%d", ch, state);
+
+ switch (state) {
+
+ case sw_chunk_start:
+ if (ch >= '0' && ch <= '9') {
+ state = sw_chunk_size;
+ ctx->size = ch - '0';
+ break;
+ }
+
+ c = (u_char) (ch | 0x20);
+
+ if (c >= 'a' && c <= 'f') {
+ state = sw_chunk_size;
+ ctx->size = c - 'a' + 10;
+ break;
+ }
+
+ goto invalid;
+
+ case sw_chunk_size:
+ if (ch >= '0' && ch <= '9') {
+ ctx->size = ctx->size * 16 + (ch - '0');
+ break;
+ }
+
+ c = (u_char) (ch | 0x20);
+
+ if (c >= 'a' && c <= 'f') {
+ ctx->size = ctx->size * 16 + (c - 'a' + 10);
+ break;
+ }
+
+ if (ctx->size == 0) {
+
+ switch (ch) {
+ case CR:
+ state = sw_last_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ break;
+ case ';':
+ state = sw_last_chunk_extension;
+ break;
+ default:
+ goto invalid;
+ }
+
+ break;
+ }
+
+ switch (ch) {
+ case CR:
+ state = sw_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_data;
+ break;
+ case ';':
+ state = sw_chunk_extension;
+ break;
+ default:
+ goto invalid;
+ }
+
+ break;
+
+ case sw_chunk_extension:
+ switch (ch) {
+ case CR:
+ state = sw_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_data;
+ }
+ break;
+
+ case sw_chunk_extension_almost_done:
+ if (ch == LF) {
+ state = sw_chunk_data;
+ break;
+ }
+ goto invalid;
+
+ case sw_chunk_data:
+ rc = NGX_OK;
+ goto data;
+
+ case sw_after_data:
+ switch (ch) {
+ case CR:
+ state = sw_after_data_almost_done;
+ break;
+ case LF:
+ state = sw_chunk_start;
+ }
+ break;
+
+ case sw_after_data_almost_done:
+ if (ch == LF) {
+ state = sw_chunk_start;
+ break;
+ }
+ goto invalid;
+
+ case sw_last_chunk_extension:
+ switch (ch) {
+ case CR:
+ state = sw_last_chunk_extension_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ }
+ break;
+
+ case sw_last_chunk_extension_almost_done:
+ if (ch == LF) {
+ state = sw_trailer;
+ break;
+ }
+ goto invalid;
+
+ case sw_trailer:
+ switch (ch) {
+ case CR:
+ state = sw_trailer_almost_done;
+ break;
+ case LF:
+ goto done;
+ default:
+ state = sw_trailer_header;
+ }
+ break;
+
+ case sw_trailer_almost_done:
+ if (ch == LF) {
+ goto done;
+ }
+ goto invalid;
+
+ case sw_trailer_header:
+ switch (ch) {
+ case CR:
+ state = sw_trailer_header_almost_done;
+ break;
+ case LF:
+ state = sw_trailer;
+ }
+ break;
+
+ case sw_trailer_header_almost_done:
+ if (ch == LF) {
+ state = sw_trailer;
+ break;
+ }
+ goto invalid;
+
+ }
+ }
+
+data:
+
+ ctx->state = state;
+ buf->pos = pos;
+
+ switch (state) {
+
+ case sw_chunk_start:
+ ctx->length = 3 /* "0" LF LF */;
+ break;
+ case sw_chunk_size:
+ ctx->length = 2 /* LF LF */
+ + (ctx->size ? ctx->size + 4 /* LF "0" LF LF */ : 0);
+ break;
+ case sw_chunk_extension:
+ case sw_chunk_extension_almost_done:
+ ctx->length = 1 /* LF */ + ctx->size + 4 /* LF "0" LF LF */;
+ break;
+ case sw_chunk_data:
+ ctx->length = ctx->size + 4 /* LF "0" LF LF */;
+ break;
+ case sw_after_data:
+ case sw_after_data_almost_done:
+ ctx->length = 4 /* LF "0" LF LF */;
+ break;
+ case sw_last_chunk_extension:
+ case sw_last_chunk_extension_almost_done:
+ ctx->length = 2 /* LF LF */;
+ break;
+ case sw_trailer:
+ case sw_trailer_almost_done:
+ ctx->length = 1 /* LF */;
+ break;
+ case sw_trailer_header:
+ case sw_trailer_header_almost_done:
+ ctx->length = 2 /* LF LF */;
+ break;
+
+ }
+
+ return rc;
+
+done:
+
+ return NGX_DONE;
+
+invalid:
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_chunked_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
+{
+ ngx_int_t rc;
+ ngx_buf_t *b, **prev;
+ ngx_chain_t *cl;
+ ngx_http_request_t *r;
+ ngx_http_proxy_ctx_t *ctx;
+
+ if (buf->pos == buf->last) {
+ return NGX_OK;
+ }
+
+ r = p->input_ctx;
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+
+ b = NULL;
+ prev = &buf->shadow;
+
+ for ( ;; ) {
+
+ rc = ngx_http_proxy_parse_chunked(r, buf);
+
+ if (rc == NGX_OK) {
+
+ /* a chunk has been parsed successfully */
+
+ if (p->free) {
+ cl = p->free;
+ b = cl->buf;
+ p->free = cl->next;
+ ngx_free_chain(p->pool, cl);
+
+ } else {
+ b = ngx_alloc_buf(p->pool);
+ if (b == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ ngx_memzero(b, sizeof(ngx_buf_t));
+
+ b->pos = buf->pos;
+ b->start = buf->start;
+ b->end = buf->end;
+ b->tag = p->tag;
+ b->temporary = 1;
+ b->recycled = 1;
+
+ *prev = b;
+ prev = &b->shadow;
+
+ cl = ngx_alloc_chain_link(p->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = b;
+ cl->next = NULL;
+
+ if (p->in) {
+ *p->last_in = cl;
+ } else {
+ p->in = cl;
+ }
+ p->last_in = &cl->next;
+
+ /* STUB */ b->num = buf->num;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "input buf #%d %p", b->num, b->pos);
+
+ if (buf->last - buf->pos >= ctx->size) {
+
+ buf->pos += ctx->size;
+ b->last = buf->pos;
+ ctx->size = 0;
+
+ continue;
+ }
+
+ ctx->size -= buf->last - buf->pos;
+ buf->pos = buf->last;
+ b->last = buf->last;
+
+ continue;
+ }
+
+ if (rc == NGX_DONE) {
+
+ /* a whole response has been parsed successfully */
+
+ p->upstream_done = 1; /* or p->length = 0; ? */
+ r->upstream->keepalive = 1;
+
+ break;
+ }
+
+ if (rc == NGX_AGAIN) {
+
+ /* set p->length, minimal amount of data we want to see */
+
+ p->length = ctx->length;
+
+ break;
+ }
+
+ /* invalid response */
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy chunked state %d, length %d",
+ ctx->state, p->length);
+
+ if (b) {
+ b->shadow = buf;
+ b->last_shadow = 1;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "input buf %p %z", b->pos, b->last - b->pos);
+
+ return NGX_OK;
+ }
+
+ /* there is no data record in the buf, add it to free chain */
+
+ if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_copy_filter(void *data, ssize_t bytes)
+{
+ ngx_http_request_t *r = data;
+
+ ngx_buf_t *b;
+ ngx_chain_t *cl, **ll;
+ ngx_http_upstream_t *u;
+
+ u = r->upstream;
+
+ for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+ ll = &cl->next;
+ }
+
+ cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ *ll = cl;
+
+ cl->buf->flush = 1;
+ cl->buf->memory = 1;
+
+ b = &u->buffer;
+
+ cl->buf->pos = b->last;
+ b->last += bytes;
+ cl->buf->last = b->last;
+ cl->buf->tag = u->output.tag;
+
+ if (u->length == -1) {
+ return NGX_OK;
+ }
+
+ u->length -= bytes;
+
+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_proxy_non_buffered_chunked_filter(void *data, ssize_t bytes)
+{
+ ngx_http_request_t *r = data;
+
+ ngx_int_t rc;
+ ngx_buf_t *b, *buf;
+ ngx_chain_t *cl, **ll;
+ ngx_http_upstream_t *u;
+ ngx_http_proxy_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module);
+ u = r->upstream;
+ buf = &u->buffer;
+
+ buf->pos = buf->last;
+ buf->last += bytes;
+
+ for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
+ ll = &cl->next;
+ }
+
+ for ( ;; ) {
+
+ rc = ngx_http_proxy_parse_chunked(r, buf);
+
+ if (rc == NGX_OK) {
+
+ /* a chunk has been parsed successfully */
+
+ cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ *ll = cl;
+ ll = &cl->next;
+
+ b = cl->buf;
+
+ b->flush = 1;
+ b->memory = 1;
+
+ b->pos = buf->pos;
+ b->tag = u->output.tag;
+
+ if (buf->last - buf->pos >= ctx->size) {
+ buf->pos += ctx->size;
+ b->last = buf->pos;
+ ctx->size = 0;
+
+ } else {
+ ctx->size -= buf->last - buf->pos;
+ buf->pos = buf->last;
+ b->last = buf->last;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy out buf %p %z",
+ b->pos, b->last - b->pos);
+
+ continue;
+ }
+
+ if (rc == NGX_DONE) {
+
+ /* a whole response has been parsed successfully */
+
+ u->keepalive = 1;
+ u->length = 0;
+
+ break;
+ }
+
+ if (rc == NGX_AGAIN) {
+ break;
+ }
+
+ /* invalid response */
+
+ ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
+ "upstream sent invalid chunked response");
+
+ return NGX_ERROR;
+ }
+
+ /* provide continuous buffer for subrequests in memory */
+
+ if (r->subrequest_in_memory) {
+
+ cl = u->out_bufs;
+
+ if (cl) {
+ buf->pos = cl->buf->pos;
+ }
+
+ buf->last = buf->pos;
+
+ for (cl = u->out_bufs; cl; cl = cl->next) {
+ ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http proxy in memory %p-%p %uz",
+ cl->buf->pos, cl->buf->last, ngx_buf_size(cl->buf));
+
+ if (buf->last == cl->buf->pos) {
+ buf->last = cl->buf->last;
+ continue;
+ }
+
+ buf->last = ngx_movemem(buf->last, cl->buf->pos,
+ cl->buf->last - cl->buf->pos);
+
+ cl->buf->pos = buf->last - (cl->buf->last - cl->buf->pos);
+ cl->buf->last = buf->last;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
static void
ngx_http_proxy_abort_request(ngx_http_request_t *r)
{
@@ -1704,6 +2460,8 @@ ngx_http_proxy_create_loc_conf(ngx_conf_
conf->redirect = NGX_CONF_UNSET;
conf->upstream.change_buffering = 1;

+ conf->http_version = NGX_CONF_UNSET_UINT;
+
conf->headers_hash_max_size = NGX_CONF_UNSET_UINT;
conf->headers_hash_bucket_size = NGX_CONF_UNSET_UINT;

@@ -2005,6 +2763,9 @@ ngx_http_proxy_merge_loc_conf(ngx_conf_t
}
#endif

+ ngx_conf_merge_uint_value(conf->http_version, prev->http_version,
+ NGX_HTTP_VERSION_10);
+
ngx_conf_merge_uint_value(conf->headers_hash_max_size,
prev->headers_hash_max_size, 512);

diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -72,6 +72,8 @@ static void ngx_http_upstream_finalize_r

static ngx_int_t ngx_http_upstream_process_header_line(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_process_set_cookie(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t
@@ -89,6 +91,9 @@ static ngx_int_t ngx_http_upstream_proce
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_process_charset(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
+static ngx_int_t
+ ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_header_line(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t
@@ -96,8 +101,6 @@ static ngx_int_t
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_content_type(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
-static ngx_int_t ngx_http_upstream_copy_content_length(ngx_http_request_t *r,
- ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_copy_last_modified(ngx_http_request_t *r,
ngx_table_elt_t *h, ngx_uint_t offset);
static ngx_int_t ngx_http_upstream_rewrite_location(ngx_http_request_t *r,
@@ -149,9 +152,9 @@ ngx_http_upstream_header_t ngx_http_ups
ngx_http_upstream_copy_content_type, 0, 1 },

{ ngx_string("Content-Length"),
- ngx_http_upstream_process_header_line,
+ ngx_http_upstream_process_content_length,
offsetof(ngx_http_upstream_headers_in_t, content_length),
- ngx_http_upstream_copy_content_length, 0, 0 },
+ ngx_http_upstream_ignore_header_line, 0, 0 },

{ ngx_string("Date"),
ngx_http_upstream_process_header_line,
@@ -247,6 +250,10 @@ ngx_http_upstream_header_t ngx_http_ups
ngx_http_upstream_process_charset, 0,
ngx_http_upstream_copy_header_line, 0, 0 },

+ { ngx_string("Transfer-Encoding"),
+ ngx_http_upstream_process_transfer_encoding, 0,
+ ngx_http_upstream_ignore_header_line, 0, 0 },
+
#if (NGX_HTTP_GZIP)
{ ngx_string("Content-Encoding"),
ngx_http_upstream_process_header_line,
@@ -396,6 +403,8 @@ ngx_http_upstream_create(ngx_http_reques
r->cache = NULL;
#endif

+ u->headers_in.content_length_n = -1;
+
return NGX_OK;
}

@@ -798,6 +807,7 @@ ngx_http_upstream_cache_send(ngx_http_re
u->buffer.pos += c->header_start;

ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+ u->headers_in.content_length_n = -1;

if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
@@ -1280,7 +1290,10 @@ ngx_http_upstream_reinit(ngx_http_reques
return NGX_ERROR;
}

+ u->keepalive = 0;
+
ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
+ u->headers_in.content_length_n = -1;

if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
@@ -1922,14 +1935,9 @@ ngx_http_upstream_process_headers(ngx_ht
r->headers_out.status = u->headers_in.status_n;
r->headers_out.status_line = u->headers_in.status_line;

- u->headers_in.content_length_n = r->headers_out.content_length_n;
-
- if (r->headers_out.content_length_n != -1) {
- u->length = (size_t) r->headers_out.content_length_n;
-
- } else {
- u->length = NGX_MAX_SIZE_T_VALUE;
- }
+ r->headers_out.content_length_n = u->headers_in.content_length_n;
+
+ u->length = u->headers_in.content_length_n;

return NGX_OK;
}
@@ -1993,6 +2001,11 @@ ngx_http_upstream_process_body_in_memory
}
}

+ if (u->length == 0) {
+ ngx_http_upstream_finalize_request(r, u, 0);
+ return;
+ }
+
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
@@ -2294,6 +2307,15 @@ ngx_http_upstream_send_response(ngx_http
u->read_event_handler = ngx_http_upstream_process_upstream;
r->write_event_handler = ngx_http_upstream_process_downstream;

+ p->length = -1;
+
+ if (u->input_filter_init
+ && u->input_filter_init(p->input_ctx) != NGX_OK)
+ {
+ ngx_http_upstream_finalize_request(r, u, 0);
+ return;
+ }
+
ngx_http_upstream_process_upstream(r, u);
}

@@ -2386,6 +2408,10 @@ ngx_http_upstream_process_non_buffered_r

if (u->busy_bufs == NULL) {

+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http upstream non buffered length:%O",
+ u->length);
+
if (u->length == 0
|| upstream->read->eof
|| upstream->read->error)
@@ -2401,10 +2427,6 @@ ngx_http_upstream_process_non_buffered_r

size = b->end - b->last;

- if (size > u->length) {
- size = u->length;
- }
-
if (size && upstream->read->ready) {

n = upstream->recv(upstream, b->last, size);
@@ -2501,12 +2523,16 @@ ngx_http_upstream_non_buffered_filter(vo
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;

- if (u->length == NGX_MAX_SIZE_T_VALUE) {
+ if (u->length == -1) {
return NGX_OK;
}

u->length -= bytes;

+ if (u->length == 0) {
+ u->keepalive = 1;
+ }
+
return NGX_OK;
}

@@ -3057,6 +3083,21 @@ ngx_http_upstream_ignore_header_line(ngx


static ngx_int_t
+ngx_http_upstream_process_content_length(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset)
+{
+ ngx_http_upstream_t *u;
+
+ u = r->upstream;
+
+ u->headers_in.content_length = h;
+ u->headers_in.content_length_n = ngx_atoof(h->value.data, h->value.len);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_upstream_process_set_cookie(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
@@ -3318,6 +3359,23 @@ ngx_http_upstream_process_charset(ngx_ht


static ngx_int_t
+ngx_http_upstream_process_transfer_encoding(ngx_http_request_t *r,
+ ngx_table_elt_t *h, ngx_uint_t offset)
+{
+ r->upstream->headers_in.transfer_encoding = h;
+
+ if (ngx_strlcasestrn(h->value.data, h->value.data + h->value.len,
+ (u_char *) "chunked", 7 - 1)
+ != NULL)
+ {
+ r->upstream->headers_in.chunked = 1;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_http_upstream_copy_header_line(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
@@ -3425,26 +3483,6 @@ ngx_http_upstream_copy_content_type(ngx_


static ngx_int_t
-ngx_http_upstream_copy_content_length(ngx_http_request_t *r, ngx_table_elt_t *h,
- ngx_uint_t offset)
-{
- ngx_table_elt_t *ho;
-
- ho = ngx_list_push(&r->headers_out.headers);
- if (ho == NULL) {
- return NGX_ERROR;
- }
-
- *ho = *h;
-
- r->headers_out.content_length = ho;
- r->headers_out.content_length_n = ngx_atoof(h->value.data, h->value.len);
-
- return NGX_OK;
-}
-
-
-static ngx_int_t
ngx_http_upstream_copy_last_modified(ngx_http_request_t *r, ngx_table_elt_t *h,
ngx_uint_t offset)
{
diff --git a/src/http/ngx_http_upstream.h b/src/http/ngx_http_upstream.h
--- a/src/http/ngx_http_upstream.h
+++ b/src/http/ngx_http_upstream.h
@@ -216,6 +216,7 @@ typedef struct {
ngx_table_elt_t *location;
ngx_table_elt_t *accept_ranges;
ngx_table_elt_t *www_authenticate;
+ ngx_table_elt_t *transfer_encoding;

#if (NGX_HTTP_GZIP)
ngx_table_elt_t *content_encoding;
@@ -224,6 +225,8 @@ typedef struct {
off_t content_length_n;

ngx_array_t cache_control;
+
+ unsigned chunked:1;
} ngx_http_upstream_headers_in_t;


@@ -266,7 +269,7 @@ struct ngx_http_upstream_s {
ngx_http_upstream_resolved_t *resolved;

ngx_buf_t buffer;
- size_t length;
+ off_t length;

ngx_chain_t *out_bufs;
ngx_chain_t *busy_bufs;
@@ -307,6 +310,7 @@ struct ngx_http_upstream_s {
#endif

unsigned buffering:1;
+ unsigned keepalive:1;

unsigned request_sent:1;
unsigned header_sent:1;
_______________________________________________
nginx-devel mailing list
nginx-devel@nginx.org
http://mailman.nginx.org/mailman/listinfo/nginx-devel
Subject Author Views Posted

upstream keepalive: call for testing

Maxim Dounin 2973 July 26, 2011 07:58AM

Re: upstream keepalive: call for testing

Gena Makhomed 791 July 26, 2011 09:06AM

Re: upstream keepalive: call for testing

Maxim Dounin 806 July 26, 2011 09:10AM

Re: upstream keepalive: call for testing

Thomas Love 821 July 27, 2011 09:44AM

Re: upstream keepalive: call for testing

Maxim Dounin 833 July 27, 2011 10:40AM

Re: upstream keepalive: call for testing

Brian Pane 861 July 28, 2011 12:14AM

Re: upstream keepalive: call for testing

Igor Sysoev 1061 July 28, 2011 02:46AM

Re: upstream keepalive: call for testing

Thomas Love 792 July 29, 2011 09:46AM

Re: upstream keepalive: call for testing

Maxim Dounin 806 July 29, 2011 11:38AM

Re: upstream keepalive: call for testing

splitice 1648 July 29, 2011 11:56AM

Re: upstream keepalive: call for testing

Maxim Dounin 804 July 29, 2011 12:16PM

Re: upstream keepalive: call for testing

splitice 1064 July 29, 2011 12:46PM

Re: upstream keepalive: call for testing

Igor Sysoev 907 July 30, 2011 01:54PM

Re: upstream keepalive: call for testing

Thomas Love 825 August 02, 2011 11:10AM

Re: upstream keepalive: call for testing

Maxim Dounin 1112 August 02, 2011 11:28AM

Re: upstream keepalive: call for testing

Srebrenko Šehić 810 July 30, 2011 01:48PM

Re: upstream keepalive: call for testing

Maxim Dounin 849 July 30, 2011 05:04PM



Sorry, you do not have permission to post/reply in this forum.

Online Users

Guests: 258
Record Number of Users: 8 on April 13, 2023
Record Number of Guests: 421 on December 02, 2018
Powered by nginx      Powered by FreeBSD      PHP Powered      Powered by MariaDB      ipv6 ready