Welcome! Log In Create A New Profile

Advanced

[njs] Stream: improved s.send() with async callbacks.

Dmitry Volyntsev
August 23, 2022 11:18PM
details: https://hg.nginx.org/njs/rev/b33aae5e8dc6
branches:
changeset: 1933:b33aae5e8dc6
user: Dmitry Volyntsev <xeioex@nginx.com>
date: Tue Aug 23 19:36:16 2022 -0700
description:
Stream: improved s.send() with async callbacks.

Previously, s.send() was a context dependant method because the
direction it was sending data to was determined by a callback (upstream
or downstream) it was called from. This works for synchronous
callbacks it was originally designed, but fails with async functions
(e.g. ngx.fetch()).

The fix is to store the direction data was going to as a separate flag
which can be used by s.send().

diffstat:

nginx/ngx_stream_js_module.c | 91 ++++++++++++++++++++++++++++++++++++++-----
1 files changed, 79 insertions(+), 12 deletions(-)

diffs (186 lines):

diff -r 2f0056631e75 -r b33aae5e8dc6 nginx/ngx_stream_js_module.c
--- a/nginx/ngx_stream_js_module.c Mon Aug 22 22:03:15 2022 -0700
+++ b/nginx/ngx_stream_js_module.c Tue Aug 23 19:36:16 2022 -0700
@@ -83,6 +83,8 @@ static ngx_int_t ngx_stream_js_phase_han
ngx_str_t *name);
static ngx_int_t ngx_stream_js_body_filter(ngx_stream_session_t *s,
ngx_chain_t *in, ngx_uint_t from_upstream);
+static ngx_int_t ngx_stream_js_next_filter(ngx_stream_session_t *s,
+ ngx_stream_js_ctx_t *ctx, ngx_chain_t *out, ngx_uint_t from_upstream);
static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
@@ -92,7 +94,8 @@ static void ngx_stream_js_drop_events(ng
static void ngx_stream_js_cleanup(void *data);
static void ngx_stream_js_cleanup_vm(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
- ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event);
+ ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event,
+ ngx_uint_t from_upstream);
static njs_vm_event_t *ngx_stream_js_event(ngx_stream_session_t *s,
njs_str_t *event);

@@ -514,6 +517,17 @@ static njs_external_t ngx_stream_js_ext
}
},

+ {
+ .flags = NJS_EXTERN_PROPERTY,
+ .name.string = njs_str("from_upstream"),
+ .enumerable = 1,
+ .u.property = {
+ .handler = ngx_js_ext_flags,
+ .magic16 = NGX_JS_BOOLEAN,
+ .magic32 = 0x00000002,
+ }
+ },
+
};


@@ -620,7 +634,7 @@ ngx_stream_js_phase_handler(ngx_stream_s
}
}

- ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD]);
+ ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD], 0);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);

@@ -655,11 +669,11 @@ static ngx_int_t
ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
ngx_uint_t from_upstream)
{
+ ngx_int_t rc;
ngx_str_t exception;
njs_int_t ret;
- ngx_int_t rc;
- ngx_chain_t *out, *cl, **busy;
- ngx_connection_t *c, *dst;
+ ngx_chain_t *out, *cl;
+ ngx_connection_t *c;
ngx_stream_js_ev_t *event;
ngx_stream_js_ctx_t *ctx;
ngx_stream_js_srv_conf_t *jscf;
@@ -704,7 +718,7 @@ ngx_stream_js_body_filter(ngx_stream_ses
event = ngx_stream_event(from_upstream);

if (event->ev != NULL) {
- ret = ngx_stream_js_run_event(s, ctx, event);
+ ret = ngx_stream_js_run_event(s, ctx, event, from_upstream);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);

@@ -731,8 +745,23 @@ ngx_stream_js_body_filter(ngx_stream_ses
in = in->next;
}

+ ctx->buf = NULL;
*ctx->last_out = NULL;

+ return ngx_stream_js_next_filter(s, ctx, out, from_upstream);
+}
+
+
+static ngx_int_t
+ngx_stream_js_next_filter(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
+ ngx_chain_t *out, ngx_uint_t from_upstream)
+{
+ ngx_int_t rc;
+ ngx_chain_t **busy;
+ ngx_connection_t *c, *dst;
+
+ c = s->connection;
+
if (from_upstream) {
dst = c;
busy = &ctx->downstream_busy;
@@ -946,7 +975,7 @@ ngx_stream_js_cleanup_vm(void *data)

static njs_int_t
ngx_stream_js_run_event(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
- ngx_stream_js_ev_t *event)
+ ngx_stream_js_ev_t *event, ngx_uint_t from_upstream)
{
size_t len;
u_char *p;
@@ -980,7 +1009,7 @@ ngx_stream_js_run_event(ngx_stream_sessi
return ret;
}

- flags = b && b->last_buf;
+ flags = from_upstream << 1 | (uintptr_t) (b && b->last_buf);

ret = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[2]),
ngx_stream_js_session_flags_proto_id, (void *) flags, 0);
@@ -1249,6 +1278,7 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs

static const njs_str_t last_key = njs_str("last");
static const njs_str_t flush_key = njs_str("flush");
+ static const njs_str_t from_key = njs_str("from_upstream");

s = njs_vm_external(vm, ngx_stream_js_session_proto_id,
njs_argument(args, 0));
@@ -1271,8 +1301,19 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs
return NJS_ERROR;
}

- flush = ctx->buf->flush;
- last_buf = ctx->buf->last_buf;
+ /*
+ * ctx->buf != NULL when s.send() is called while processing incoming
+ * data chunks, otherwise s.send() is called asynchronously
+ */
+
+ if (ctx->buf != NULL) {
+ flush = ctx->buf->flush;
+ last_buf = ctx->buf->last_buf;
+
+ } else {
+ flush = 0;
+ last_buf = 0;
+ }

flags = njs_arg(args, nargs, 2);

@@ -1308,12 +1349,38 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs
b->pos = b->start;
b->last = b->end;

- *ctx->last_out = cl;
- ctx->last_out = &cl->next;
+ if (ctx->buf != NULL) {
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+
+ } else {
+ if (!njs_value_is_object(flags)) {
+ goto exception;
+ }
+
+ value = njs_vm_object_prop(vm, flags, &from_key, &lvalue);
+ if (value == NULL) {
+ goto exception;
+ }
+
+ if (ngx_stream_js_next_filter(s, ctx, cl, njs_value_bool(value))
+ == NGX_ERROR)
+ {
+ njs_vm_error(vm, "ngx_stream_js_next_filter() failed");
+ return NJS_ERROR;
+ }
+ }

njs_value_undefined_set(njs_vm_retval(vm));

return NJS_OK;
+
+exception:
+
+ njs_vm_error(vm, "\"from_upstream\" flag is expected when"
+ "called asynchronously");
+
+ return NJS_ERROR;
}


_______________________________________________
nginx-devel mailing list -- nginx-devel@nginx.org
To unsubscribe send an email to nginx-devel-leave@nginx.org
Subject Author Views Posted

[njs] Stream: improved s.send() with async callbacks.

Dmitry Volyntsev 336 August 23, 2022 11:18PM



Sorry, you do not have permission to post/reply in this forum.

Online Users

Guests: 158
Record Number of Users: 8 on April 13, 2023
Record Number of Guests: 421 on December 02, 2018
Powered by nginx      Powered by FreeBSD      PHP Powered      Powered by MariaDB      ipv6 ready