Welcome! Log In Create A New Profile

Advanced

[nginx] Stream: filters.

Vladimir Homutov
September 15, 2016 07:58AM
details: http://hg.nginx.org/nginx/rev/56fc55e32f23
branches:
changeset: 6692:56fc55e32f23
user: Roman Arutyunyan <arut@nginx.com>
date: Thu Sep 15 14:55:46 2016 +0300
description:
Stream: filters.

diffstat:

auto/modules | 6 +-
auto/sources | 1 +
src/event/modules/ngx_iocp_module.c | 2 +
src/event/ngx_event.h | 1 +
src/event/ngx_event_accept.c | 1 +
src/event/ngx_event_connect.c | 1 +
src/os/unix/ngx_darwin_init.c | 1 +
src/os/unix/ngx_freebsd_init.c | 1 +
src/os/unix/ngx_linux_init.c | 1 +
src/os/unix/ngx_os.h | 3 +
src/os/unix/ngx_posix_init.c | 1 +
src/os/unix/ngx_solaris_init.c | 1 +
src/os/unix/ngx_udp_sendmsg_chain.c | 245 +++++++++++++++++++++++++
src/os/win32/ngx_os.h | 2 +
src/os/win32/ngx_win32_init.c | 2 +
src/stream/ngx_stream.c | 3 +
src/stream/ngx_stream.h | 10 +
src/stream/ngx_stream_handler.c | 4 +
src/stream/ngx_stream_proxy_module.c | 228 ++++++++++++++--------
src/stream/ngx_stream_return_module.c | 55 +++--
src/stream/ngx_stream_upstream.h | 10 +
src/stream/ngx_stream_write_filter_module.c | 273 ++++++++++++++++++++++++++++
22 files changed, 744 insertions(+), 108 deletions(-)

diffs (truncated from 1269 to 1000 lines):

diff -r 4bce3edfac2c -r 56fc55e32f23 auto/modules
--- a/auto/modules Thu Sep 15 14:56:26 2016 +0300
+++ b/auto/modules Thu Sep 15 14:55:46 2016 +0300
@@ -973,7 +973,8 @@ if [ $STREAM != NO ]; then
ngx_stream_core_module \
ngx_stream_log_module \
ngx_stream_proxy_module \
- ngx_stream_upstream_module"
+ ngx_stream_upstream_module \
+ ngx_stream_write_filter_module"
ngx_module_incs="src/stream"
ngx_module_deps="src/stream/ngx_stream.h \
src/stream/ngx_stream_variables.h \
@@ -988,7 +989,8 @@ if [ $STREAM != NO ]; then
src/stream/ngx_stream_log_module.c \
src/stream/ngx_stream_proxy_module.c \
src/stream/ngx_stream_upstream.c \
- src/stream/ngx_stream_upstream_round_robin.c"
+ src/stream/ngx_stream_upstream_round_robin.c \
+ src/stream/ngx_stream_write_filter_module.c"

. auto/module

diff -r 4bce3edfac2c -r 56fc55e32f23 auto/sources
--- a/auto/sources Thu Sep 15 14:56:26 2016 +0300
+++ b/auto/sources Thu Sep 15 14:55:46 2016 +0300
@@ -167,6 +167,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
src/os/unix/ngx_send.c \
src/os/unix/ngx_writev_chain.c \
src/os/unix/ngx_udp_send.c \
+ src/os/unix/ngx_udp_sendmsg_chain.c \
src/os/unix/ngx_channel.c \
src/os/unix/ngx_shmem.c \
src/os/unix/ngx_process.c \
diff -r 4bce3edfac2c -r 56fc55e32f23 src/event/modules/ngx_iocp_module.c
--- a/src/event/modules/ngx_iocp_module.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/event/modules/ngx_iocp_module.c Thu Sep 15 14:55:46 2016 +0300
@@ -93,6 +93,8 @@ ngx_os_io_t ngx_iocp_io = {
NULL,
ngx_udp_overlapped_wsarecv,
NULL,
+ NULL,
+ NULL,
ngx_overlapped_wsasend_chain,
0
};
diff -r 4bce3edfac2c -r 56fc55e32f23 src/event/ngx_event.h
--- a/src/event/ngx_event.h Thu Sep 15 14:56:26 2016 +0300
+++ b/src/event/ngx_event.h Thu Sep 15 14:55:46 2016 +0300
@@ -430,6 +430,7 @@ extern ngx_os_io_t ngx_io;
#define ngx_send ngx_io.send
#define ngx_send_chain ngx_io.send_chain
#define ngx_udp_send ngx_io.udp_send
+#define ngx_udp_send_chain ngx_io.udp_send_chain


#define NGX_EVENT_MODULE 0x544E5645 /* "EVNT" */
diff -r 4bce3edfac2c -r 56fc55e32f23 src/event/ngx_event_accept.c
--- a/src/event/ngx_event_accept.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/event/ngx_event_accept.c Thu Sep 15 14:55:46 2016 +0300
@@ -467,6 +467,7 @@ ngx_event_recvmsg(ngx_event_t *ev)
*log = ls->log;

c->send = ngx_udp_send;
+ c->send_chain = ngx_udp_send_chain;

c->log = log;
c->pool->log = log;
diff -r 4bce3edfac2c -r 56fc55e32f23 src/event/ngx_event_connect.c
--- a/src/event/ngx_event_connect.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/event/ngx_event_connect.c Thu Sep 15 14:55:46 2016 +0300
@@ -166,6 +166,7 @@ ngx_event_connect_peer(ngx_peer_connecti
} else { /* type == SOCK_DGRAM */
c->recv = ngx_udp_recv;
c->send = ngx_send;
+ c->send_chain = ngx_udp_send_chain;
}

c->log_error = pc->log_error;
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_darwin_init.c
--- a/src/os/unix/ngx_darwin_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_darwin_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -24,6 +24,7 @@ static ngx_os_io_t ngx_darwin_io = {
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
+ ngx_udp_unix_sendmsg_chain,
#if (NGX_HAVE_SENDFILE)
ngx_darwin_sendfile_chain,
NGX_IO_SENDFILE
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_freebsd_init.c
--- a/src/os/unix/ngx_freebsd_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_freebsd_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -33,6 +33,7 @@ static ngx_os_io_t ngx_freebsd_io = {
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
+ ngx_udp_unix_sendmsg_chain,
#if (NGX_HAVE_SENDFILE)
ngx_freebsd_sendfile_chain,
NGX_IO_SENDFILE
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_linux_init.c
--- a/src/os/unix/ngx_linux_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_linux_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -19,6 +19,7 @@ static ngx_os_io_t ngx_linux_io = {
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
+ ngx_udp_unix_sendmsg_chain,
#if (NGX_HAVE_SENDFILE)
ngx_linux_sendfile_chain,
NGX_IO_SENDFILE
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_os.h
--- a/src/os/unix/ngx_os.h Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_os.h Thu Sep 15 14:55:46 2016 +0300
@@ -29,6 +29,7 @@ typedef struct {
ngx_recv_pt udp_recv;
ngx_send_pt send;
ngx_send_pt udp_send;
+ ngx_send_chain_pt udp_send_chain;
ngx_send_chain_pt send_chain;
ngx_uint_t flags;
} ngx_os_io_t;
@@ -49,6 +50,8 @@ ssize_t ngx_unix_send(ngx_connection_t *
ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in,
off_t limit);
ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size);
+ngx_chain_t *ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in,
+ off_t limit);


#if (IOV_MAX > 64)
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_posix_init.c
--- a/src/os/unix/ngx_posix_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_posix_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -25,6 +25,7 @@ ngx_os_io_t ngx_os_io = {
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
+ ngx_udp_unix_sendmsg_chain,
ngx_writev_chain,
0
};
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_solaris_init.c
--- a/src/os/unix/ngx_solaris_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/unix/ngx_solaris_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -20,6 +20,7 @@ static ngx_os_io_t ngx_solaris_io = {
ngx_udp_unix_recv,
ngx_unix_send,
ngx_udp_unix_send,
+ ngx_udp_unix_sendmsg_chain,
#if (NGX_HAVE_SENDFILE)
ngx_solaris_sendfilev_chain,
NGX_IO_SENDFILE
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/unix/ngx_udp_sendmsg_chain.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/os/unix/ngx_udp_sendmsg_chain.c Thu Sep 15 14:55:46 2016 +0300
@@ -0,0 +1,245 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+
+static ngx_chain_t *ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec,
+ ngx_chain_t *in, ngx_log_t *log);
+static ssize_t ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec);
+
+
+ngx_chain_t *
+ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+ ssize_t n;
+ off_t send;
+ ngx_chain_t *cl;
+ ngx_event_t *wev;
+ ngx_iovec_t vec;
+ struct iovec iovs[NGX_IOVS_PREALLOCATE];
+
+ wev = c->write;
+
+ if (!wev->ready) {
+ return in;
+ }
+
+#if (NGX_HAVE_KQUEUE)
+
+ if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) {
+ (void) ngx_connection_error(c, wev->kq_errno,
+ "kevent() reported about an closed connection");
+ wev->error = 1;
+ return NGX_CHAIN_ERROR;
+ }
+
+#endif
+
+ /* the maximum limit size is the maximum size_t value - the page size */
+
+ if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
+ limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;
+ }
+
+ send = 0;
+
+ vec.iovs = iovs;
+ vec.nalloc = NGX_IOVS_PREALLOCATE;
+
+ for ( ;; ) {
+
+ /* create the iovec and coalesce the neighbouring bufs */
+
+ cl = ngx_udp_output_chain_to_iovec(&vec, in, c->log);
+
+ if (cl == NGX_CHAIN_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+
+ if (cl && cl->buf->in_file) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "file buf in sendmsg "
+ "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+ cl->buf->temporary,
+ cl->buf->recycled,
+ cl->buf->in_file,
+ cl->buf->start,
+ cl->buf->pos,
+ cl->buf->last,
+ cl->buf->file,
+ cl->buf->file_pos,
+ cl->buf->file_last);
+
+ ngx_debug_point();
+
+ return NGX_CHAIN_ERROR;
+ }
+
+ if (cl == in) {
+ return in;
+ }
+
+ send += vec.size;
+
+ n = ngx_sendmsg(c, &vec);
+
+ if (n == NGX_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+
+ if (n == NGX_AGAIN) {
+ wev->ready = 0;
+ return in;
+ }
+
+ c->sent += n;
+
+ in = ngx_chain_update_sent(in, n);
+
+ if (send >= limit || in == NULL) {
+ return in;
+ }
+ }
+}
+
+
+static ngx_chain_t *
+ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *in, ngx_log_t *log)
+{
+ size_t total, size;
+ u_char *prev;
+ ngx_uint_t n, flush;
+ ngx_chain_t *cl;
+ struct iovec *iov;
+
+ cl = in;
+ iov = NULL;
+ prev = NULL;
+ total = 0;
+ n = 0;
+ flush = 0;
+
+ for ( /* void */ ; in && !flush; in = in->next) {
+
+ if (in->buf->flush || in->buf->last_buf) {
+ flush = 1;
+ }
+
+ if (ngx_buf_special(in->buf)) {
+ continue;
+ }
+
+ if (in->buf->in_file) {
+ break;
+ }
+
+ if (!ngx_buf_in_memory(in->buf)) {
+ ngx_log_error(NGX_LOG_ALERT, log, 0,
+ "bad buf in output chain "
+ "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+ in->buf->temporary,
+ in->buf->recycled,
+ in->buf->in_file,
+ in->buf->start,
+ in->buf->pos,
+ in->buf->last,
+ in->buf->file,
+ in->buf->file_pos,
+ in->buf->file_last);
+
+ ngx_debug_point();
+
+ return NGX_CHAIN_ERROR;
+ }
+
+ size = in->buf->last - in->buf->pos;
+
+ if (prev == in->buf->pos) {
+ iov->iov_len += size;
+
+ } else {
+ if (n == vec->nalloc) {
+ ngx_log_error(NGX_LOG_ALERT, log, 0,
+ "too many parts in a datagram");
+ return NGX_CHAIN_ERROR;
+ }
+
+ iov = &vec->iovs[n++];
+
+ iov->iov_base = (void *) in->buf->pos;
+ iov->iov_len = size;
+ }
+
+ prev = in->buf->pos + size;
+ total += size;
+ }
+
+ if (!flush) {
+#if (NGX_SUPPRESS_WARN)
+ vec->size = 0;
+ vec->count = 0;
+#endif
+ return cl;
+ }
+
+ vec->count = n;
+ vec->size = total;
+
+ return in;
+}
+
+
+static ssize_t
+ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec)
+{
+ ssize_t n;
+ ngx_err_t err;
+ struct msghdr msg;
+
+ ngx_memzero(&msg, sizeof(struct msghdr));
+
+ if (c->socklen) {
+ msg.msg_name = c->sockaddr;
+ msg.msg_namelen = c->socklen;
+ }
+
+ msg.msg_iov = vec->iovs;
+ msg.msg_iovlen = vec->count;
+
+eintr:
+
+ n = sendmsg(c->fd, &msg, 0);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "sendmsg: %z of %uz", n, vec->size);
+
+ if (n == -1) {
+ err = ngx_errno;
+
+ switch (err) {
+ case NGX_EAGAIN:
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
+ "sendmsg() not ready");
+ return NGX_AGAIN;
+
+ case NGX_EINTR:
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
+ "sendmsg() was interrupted");
+ goto eintr;
+
+ default:
+ c->write->error = 1;
+ ngx_connection_error(c, err, "sendmsg() failed");
+ return NGX_ERROR;
+ }
+ }
+
+ return n;
+}
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/win32/ngx_os.h
--- a/src/os/win32/ngx_os.h Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/win32/ngx_os.h Thu Sep 15 14:55:46 2016 +0300
@@ -28,6 +28,8 @@ typedef struct {
ngx_recv_chain_pt recv_chain;
ngx_recv_pt udp_recv;
ngx_send_pt send;
+ ngx_send_pt udp_send;
+ ngx_send_chain_pt udp_send_chain;
ngx_send_chain_pt send_chain;
ngx_uint_t flags;
} ngx_os_io_t;
diff -r 4bce3edfac2c -r 56fc55e32f23 src/os/win32/ngx_win32_init.c
--- a/src/os/win32/ngx_win32_init.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/os/win32/ngx_win32_init.c Thu Sep 15 14:55:46 2016 +0300
@@ -25,6 +25,8 @@ ngx_os_io_t ngx_os_io = {
ngx_wsarecv_chain,
ngx_udp_wsarecv,
ngx_wsasend,
+ NULL,
+ NULL,
ngx_wsasend_chain,
0
};
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream.c
--- a/src/stream/ngx_stream.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream.c Thu Sep 15 14:55:46 2016 +0300
@@ -27,6 +27,9 @@ static ngx_int_t ngx_stream_cmp_conf_add
ngx_uint_t ngx_stream_max_module;


+ngx_stream_filter_pt ngx_stream_top_filter;
+
+
static ngx_command_t ngx_stream_commands[] = {

{ ngx_string("stream"),
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream.h
--- a/src/stream/ngx_stream.h Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream.h Thu Sep 15 14:55:46 2016 +0300
@@ -243,6 +243,9 @@ typedef struct {
NULL)


+#define NGX_STREAM_WRITE_BUFFERED 0x10
+
+
void ngx_stream_init_connection(ngx_connection_t *c);
void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc);

@@ -252,4 +255,11 @@ extern ngx_uint_t ngx_stream_max_modu
extern ngx_module_t ngx_stream_core_module;


+typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s,
+ ngx_chain_t *chain, ngx_uint_t from_upstream);
+
+
+extern ngx_stream_filter_pt ngx_stream_top_filter;
+
+
#endif /* _NGX_STREAM_H_INCLUDED_ */
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream_handler.c
--- a/src/stream/ngx_stream_handler.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream_handler.c Thu Sep 15 14:55:46 2016 +0300
@@ -134,6 +134,10 @@ ngx_stream_init_connection(ngx_connectio
s->ssl = addr_conf->ssl;
#endif

+ if (c->buffer) {
+ s->received += c->buffer->last - c->buffer->pos;
+ }
+
s->connection = c;
c->data = s;

diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream_proxy_module.c
--- a/src/stream/ngx_stream_proxy_module.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream_proxy_module.c Thu Sep 15 14:55:46 2016 +0300
@@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_c
void *conf);
static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
+
+#if (NGX_STREAM_SSL)
+
static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
-
-#if (NGX_STREAM_SSL)
-
static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
@@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
}

u->peer.type = c->type;
-
- u->proxy_protocol = pscf->proxy_protocol;
u->start_sec = ngx_time();

c->write->handler = ngx_stream_proxy_downstream_handler;
@@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_sess
u->downstream_buf.pos = p;
u->downstream_buf.last = p;

- if (u->proxy_protocol
-#if (NGX_STREAM_SSL)
- && pscf->ssl == NULL
-#endif
- && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
- {
- /* optimization for a typical case */
-
- ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
- "stream proxy send PROXY protocol header");
-
- p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
- u->downstream_buf.end);
- if (p == NULL) {
- ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
- return;
- }
-
- u->downstream_buf.last = p;
- u->proxy_protocol = 0;
- }
-
if (c->read->ready) {
ngx_post_event(c->read, &ngx_posted_events);
}
@@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_sess

c->log->action = "connecting to upstream";

+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
u = s->upstream;

+ u->connected = 0;
+ u->proxy_protocol = pscf->proxy_protocol;
+
if (u->state) {
u->state->response_time = ngx_current_msec - u->state->response_time;
}
@@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_sess
pc->read->handler = ngx_stream_proxy_connect_handler;
pc->write->handler = ngx_stream_proxy_connect_handler;

- pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
ngx_add_timer(pc->write, pscf->connect_timeout);
}

@@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_strea
{
int tcp_nodelay;
u_char *p;
+ ngx_chain_t *cl;
ngx_connection_t *c, *pc;
ngx_log_handler_pt handler;
ngx_stream_upstream_t *u;
@@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_strea
pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
}

- if (u->proxy_protocol) {
- if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+#if (NGX_STREAM_SSL)
+
+ if (pc->type == SOCK_STREAM && pscf->ssl) {
+
+ if (u->proxy_protocol) {
+ if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
+ return;
+ }
+
+ u->proxy_protocol = 0;
+ }
+
+ if (pc->ssl == NULL) {
+ ngx_stream_proxy_ssl_init_connection(s);
return;
}
-
- u->proxy_protocol = 0;
}

- pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
-#if (NGX_STREAM_SSL)
- if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
- ngx_stream_proxy_ssl_init_connection(s);
- return;
- }
#endif

c = s->connection;
@@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_strea
u->upstream_buf.last = p;
}

- if (c->type == SOCK_DGRAM) {
- s->received = c->buffer->last - c->buffer->pos;
- u->downstream_buf = *c->buffer;
-
- if (pscf->responses == 0) {
- pc->read->ready = 0;
- pc->read->eof = 1;
+ if (c->buffer && c->buffer->pos < c->buffer->last) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream proxy add preread buffer: %uz",
+ c->buffer->last - c->buffer->pos);
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
}
+
+ *cl->buf = *c->buffer;
+
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+ cl->buf->flush = 1;
+ cl->buf->last_buf = (c->type == SOCK_DGRAM);
+
+ cl->next = u->upstream_out;
+ u->upstream_out = cl;
+ }
+
+ if (u->proxy_protocol) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream proxy add PROXY protocol header");
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ cl->buf->pos = p;
+
+ p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ cl->buf->last = p;
+ cl->buf->temporary = 1;
+ cl->buf->flush = 0;
+ cl->buf->last_buf = 0;
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+ cl->next = u->upstream_out;
+ u->upstream_out = cl;
+
+ u->proxy_protocol = 0;
+ }
+
+ if (c->type == SOCK_DGRAM && pscf->responses == 0) {
+ pc->read->ready = 0;
+ pc->read->eof = 1;
}

u->connected = 1;
@@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_strea
}


+#if (NGX_STREAM_SSL)
+
static ngx_int_t
ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
{
@@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx
}


-#if (NGX_STREAM_SSL)
-
static char *
ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
@@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_sess
size_t size, limit_rate;
ssize_t n;
ngx_buf_t *b;
+ ngx_int_t rc;
ngx_uint_t flags;
ngx_msec_t delay;
+ ngx_chain_t *cl, **ll, **out, **busy;
ngx_connection_t *c, *pc, *src, *dst;
ngx_log_handler_pt handler;
ngx_stream_upstream_t *u;
@@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_sess
b = &u->upstream_buf;
limit_rate = pscf->download_rate;
received = &u->received;
+ out = &u->downstream_out;
+ busy = &u->downstream_busy;

} else {
src = c;
@@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_sess
b = &u->downstream_buf;
limit_rate = pscf->upload_rate;
received = &s->received;
+ out = &u->upstream_out;
+ busy = &u->upstream_busy;
}

for ( ;; ) {

- if (do_write) {
-
- size = b->last - b->pos;
-
- if (size && dst && dst->write->ready) {
-
- n = dst->send(dst, b->pos, size);
-
- if (n == NGX_AGAIN && dst->shared) {
- /* cannot wait on a shared socket */
- n = NGX_ERROR;
- }
-
- if (n == NGX_ERROR) {
+ if (do_write && dst) {
+
+ if (*out || *busy || dst->buffered) {
+ rc = ngx_stream_top_filter(s, *out, from_upstream);
+
+ if (rc == NGX_ERROR) {
if (c->type == SOCK_DGRAM && !from_upstream) {
ngx_stream_proxy_next_upstream(s);
return;
@@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_sess
return;
}

- if (n > 0) {
- b->pos += n;
-
- if (b->pos == b->last) {
- b->pos = b->start;
- b->last = b->start;
- }
+ ngx_chain_update_chains(c->pool, &u->free, busy, out,
+ (ngx_buf_tag_t) &ngx_stream_proxy_module);
+
+ if (*busy == NULL) {
+ b->pos = b->start;
+ b->last = b->start;
}
}
}
@@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_sess

n = src->recv(src, b->last, size);

- if (n == NGX_AGAIN || n == 0) {
+ if (n == NGX_AGAIN) {
break;
}

- if (n > 0) {
+ if (n == NGX_ERROR) {
+ if (c->type == SOCK_DGRAM && u->received == 0) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
+ src->read->eof = 1;
+ n = 0;
+ }
+
+ if (n >= 0) {
if (limit_rate) {
delay = (ngx_msec_t) (n * 1000 / limit_rate);

@@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_sess
src->read->eof = 1;
}

+ for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s,
+ NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ *ll = cl;
+
+ cl->buf->pos = b->last;
+ cl->buf->last = b->last + n;
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+ cl->buf->temporary = (n ? 1 : 0);
+ cl->buf->last_buf = src->read->eof;
+ cl->buf->flush = 1;
+
*received += n;
b->last += n;
do_write = 1;

continue;
}
-
- if (n == NGX_ERROR) {
- if (c->type == SOCK_DGRAM && u->received == 0) {
- ngx_stream_proxy_next_upstream(s);
- return;
- }
-
- src->read->eof = 1;
- }
}

break;
}

- if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
+ if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
handler = c->log->handler;
c->log->handler = NULL;

@@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_strea
"stream proxy next upstream");

u = s->upstream;
+ pc = u->peer.connection;
+
+ if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
+ ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+ "pending buffers on next upstream");
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }

if (u->peer.sockaddr) {
u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
@@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_strea
return;
}

- pc = u->peer.connection;
-
if (pc) {
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"close proxy upstream connection: %d", pc->fd);
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream_return_module.c
--- a/src/stream/ngx_stream_return_module.c Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream_return_module.c Thu Sep 15 14:55:46 2016 +0300
@@ -11,12 +11,12 @@


typedef struct {
- ngx_stream_complex_value_t text;
+ ngx_stream_complex_value_t text;
} ngx_stream_return_srv_conf_t;


typedef struct {
- ngx_buf_t buf;
+ ngx_chain_t *out;
} ngx_stream_return_ctx_t;


@@ -72,6 +72,7 @@ static void
ngx_stream_return_handler(ngx_stream_session_t *s)
{
ngx_str_t text;
+ ngx_buf_t *b;
ngx_connection_t *c;
ngx_stream_return_ctx_t *ctx;
ngx_stream_return_srv_conf_t *rscf;
@@ -103,8 +104,25 @@ ngx_stream_return_handler(ngx_stream_ses

ngx_stream_set_ctx(s, ctx, ngx_stream_return_module);

- ctx->buf.pos = text.data;
- ctx->buf.last = text.data + text.len;
+ b = ngx_calloc_buf(c->pool);
+ if (b == NULL) {
+ ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ b->memory = 1;
+ b->pos = text.data;
+ b->last = text.data + text.len;
+ b->last_buf = 1;
+
+ ctx->out = ngx_alloc_chain_link(c->pool);
+ if (ctx->out == NULL) {
+ ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ ctx->out->buf = b;
+ ctx->out->next = NULL;

c->write->handler = ngx_stream_return_write_handler;

@@ -115,8 +133,6 @@ ngx_stream_return_handler(ngx_stream_ses
static void
ngx_stream_return_write_handler(ngx_event_t *ev)
{
- ssize_t n;
- ngx_buf_t *b;
ngx_connection_t *c;
ngx_stream_session_t *s;
ngx_stream_return_ctx_t *ctx;
@@ -130,25 +146,20 @@ ngx_stream_return_write_handler(ngx_even
return;
}

- if (ev->ready) {
- ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
-
- b = &ctx->buf;
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);

- n = c->send(c, b->pos, b->last - b->pos);
- if (n == NGX_ERROR) {
- ngx_stream_finalize_session(s, NGX_STREAM_OK);
- return;
- }
+ if (ngx_stream_top_filter(s, ctx->out, 1) == NGX_ERROR) {
+ ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }

- if (n > 0) {
- b->pos += n;
+ ctx->out = NULL;

- if (b->pos == b->last) {
- ngx_stream_finalize_session(s, NGX_STREAM_OK);
- return;
- }
- }
+ if (!c->buffered) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream return done sending");
+ ngx_stream_finalize_session(s, NGX_STREAM_OK);
+ return;
}

if (ngx_handle_write_event(ev, 0) != NGX_OK) {
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream_upstream.h
--- a/src/stream/ngx_stream_upstream.h Thu Sep 15 14:56:26 2016 +0300
+++ b/src/stream/ngx_stream_upstream.h Thu Sep 15 14:55:46 2016 +0300
@@ -106,14 +106,24 @@ typedef struct {

typedef struct {
ngx_peer_connection_t peer;
+
ngx_buf_t downstream_buf;
ngx_buf_t upstream_buf;
+
+ ngx_chain_t *free;
+ ngx_chain_t *upstream_out;
+ ngx_chain_t *upstream_busy;
+ ngx_chain_t *downstream_out;
+ ngx_chain_t *downstream_busy;
+
off_t received;
time_t start_sec;
ngx_uint_t responses;
+
#if (NGX_STREAM_SSL)
ngx_str_t ssl_name;
#endif
+
ngx_stream_upstream_resolved_t *resolved;
ngx_stream_upstream_state_t *state;
unsigned connected:1;
diff -r 4bce3edfac2c -r 56fc55e32f23 src/stream/ngx_stream_write_filter_module.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/stream/ngx_stream_write_filter_module.c Thu Sep 15 14:55:46 2016 +0300
@@ -0,0 +1,273 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.

_______________________________________________
nginx-devel mailing list
nginx-devel@nginx.org
http://mailman.nginx.org/mailman/listinfo/nginx-devel
Subject Author Views Posted

[nginx] Stream: filters.

Vladimir Homutov 673 September 15, 2016 07:58AM



Sorry, you do not have permission to post/reply in this forum.

Online Users

Guests: 310
Record Number of Users: 8 on April 13, 2023
Record Number of Guests: 421 on December 02, 2018
Powered by nginx      Powered by FreeBSD      PHP Powered      Powered by MariaDB      ipv6 ready