Skip to content

Commit

Permalink
supported http/2 (part5.5): refactor IO operations to buf/chain API
Browse files Browse the repository at this point in the history
  • Loading branch information
chobits committed Jul 31, 2023
1 parent 357a2d4 commit 37c42ba
Showing 1 changed file with 312 additions and 12 deletions.
324 changes: 312 additions & 12 deletions ngx_http_proxy_connect_module.c
Expand Up @@ -69,6 +69,7 @@ struct ngx_http_proxy_connect_upstream_s {
ngx_buf_t from_client;

ngx_output_chain_ctx_t output;
ngx_chain_writer_ctx_t writer;

ngx_buf_t buffer;

Expand All @@ -81,6 +82,8 @@ struct ngx_http_proxy_connect_upstream_s {
ngx_msec_t start_time;

ngx_http_proxy_connect_upstream_state_t state;

unsigned request_body_blocked:1;
};

struct ngx_http_proxy_connect_address_s {
Expand Down Expand Up @@ -151,6 +154,8 @@ static void ngx_http_v2_proxy_connect_send_connection_established(ngx_http_reque
static ngx_int_t ngx_http_v2_proxy_connect_process_header(ngx_http_request_t *r);
ssize_t ngx_http_v2_proxy_connect_recv(ngx_connection_t *c, u_char *buf, size_t size);
ssize_t ngx_http_v2_proxy_connect_send(ngx_connection_t *c, u_char *buf, size_t size);
void ngx_http_v2_proxy_connect_process_upstream(ngx_http_request_t *r, ngx_uint_t do_write);
void ngx_http_v2_proxy_connect_process_downstream(ngx_http_request_t *r, ngx_uint_t do_write);
#endif


Expand Down Expand Up @@ -734,20 +739,26 @@ ngx_http_v2_proxy_connect_send_connection_established(ngx_http_request_t *r)
}
#else
{
ngx_http_request_body_t *rb;
/* init upstream output writer */

rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = u->conf->buffer_size;

if (rb == NULL) {
ngx_http_proxy_connect_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
if (u->output.output_filter == NULL) {
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
}

u->writer.pool = r->pool;
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = u->peer.connection;
u->writer.limit = 0;
}

rb->rest = -1;
rb->post_handler = ngx_http_proxy_connect_read_downstream;

r->request_body = rb;
{

rc = ngx_http_v2_read_request_body(r);

Expand Down Expand Up @@ -788,12 +799,35 @@ ngx_http_proxy_connect_tunnel(ngx_http_request_t *r,
pc = u->peer.connection;

#if (NGX_HTTP_V2)
/* rewrite c->send, c->recv to read/write HTTP/2 DATA frame */

#if 1
/*
* There is no low-level recv/send IO operations of http/v2 in nginx core,
* so we just need use high-level buf/chain IO operations.
*/

if (r->stream) {
if (from_upstream) {
ngx_http_v2_proxy_connect_process_downstream(r, do_write);
} else {
ngx_http_v2_proxy_connect_process_upstream(r, do_write);
}
return ;
}
#else

/*
* Alough there is no low-level IO operations, we can monitor them
* trickly. We need to rewrite c->send, c->recv interface to read/write
* HTTP/2 DATA frame
*/

if (r->stream) {
r->connection->send = ngx_http_v2_proxy_connect_send;
r->connection->recv = ngx_http_v2_proxy_connect_recv;
}
#endif

#endif

if (from_upstream) {
Expand Down Expand Up @@ -933,11 +967,252 @@ ngx_http_proxy_connect_tunnel(ngx_http_request_t *r,


#if (NGX_HTTP_V2)
/*
* data flow: downstream -> upstream
*
* referer to
* ngx_http_upstream_send_request()
* -> ngx_http_upstream_send_request_body()
*/
void
ngx_http_v2_proxy_connect_process_upstream(ngx_http_request_t *r,
ngx_uint_t do_write)
{
ngx_int_t rc;
ngx_chain_t *out, /**cl,*/ *ln;
ngx_connection_t *c;
ngx_http_proxy_connect_ctx_t *ctx;
ngx_http_proxy_connect_upstream_t *u;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"proxy_connect: h2 downstream -> upstream");

ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_connect_module);
u = ctx->u;

#if 0
if (!r->request_body_no_buffering) {

/* buffered request body */

if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;

} else {
out = NULL;
}

rc = ngx_output_chain(&u->output, out);

if (rc == NGX_AGAIN) {
u->request_body_blocked = 1;

} else {
u->request_body_blocked = 0;
}

goto done;
}

if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;

if (r->request_body->bufs) {
for (cl = out; cl->next; cl = cl->next) { /* void */ }
cl->next = r->request_body->bufs;
r->request_body->bufs = NULL;
}

c = u->peer.connection;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
return NGX_ERROR;
}

r->read_event_handler = ngx_http_upstream_read_request_handler;

} else {
out = NULL;
}
#else
out = NULL;
r->request_body_no_buffering = 1;
#endif

for ( ;; ) {

if (do_write) {
rc = ngx_output_chain(&u->output, out);

if (rc == NGX_ERROR) {
ngx_http_proxy_connect_finalize_request(r, u, NGX_ERROR);
return;
}

while (out) {
ln = out;
out = out->next;
ngx_free_chain(r->pool, ln);
}

if (rc == NGX_AGAIN) {
u->request_body_blocked = 1;

} else {
u->request_body_blocked = 0;
}

if (rc == NGX_OK && !r->reading_body) {
break;
}
}

if (r->reading_body) {
/* read client request body */

rc = ngx_http_read_unbuffered_request_body(r);

if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_http_proxy_connect_finalize_request(r, u, rc);
return;
}

out = r->request_body->bufs;
r->request_body->bufs = NULL;
}

/* stop if there is nothing to send */

if (out == NULL) {
rc = NGX_AGAIN;
break;
}

do_write = 1;
}

//done:

#if 0
if (!r->reading_body) {
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler =
ngx_http_upstream_rd_check_broken_connection;
}
}
#endif

c = u->peer.connection;

if (rc == NGX_AGAIN) {
if (!c->write->ready || u->request_body_blocked) {
ngx_add_timer(c->write, ctx->data_timeout);

} else if (c->write->timer_set) {
ngx_del_timer(c->write);
}

if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_proxy_connect_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

#if 0
if (c->write->ready && c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == -1) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_proxy_connect_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
#endif

if (c->read->ready) {
ngx_post_event(c->read, &ngx_posted_events);
}

return;
}

/* rc == NGX_OK */

// TODO: how to handle timer

if (c->write->timer_set) {
ngx_del_timer(c->write);
}

#if 0
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == -1) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}

if (!u->conf->preserve_output) {
u->write_event_handler = ngx_http_upstream_dummy_handler;
}
#endif

if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_proxy_connect_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

#if 0
if (!u->request_body_sent) {
u->request_body_sent = 1;

if (u->header_sent) {
return;
}

ngx_add_timer(c->read, u->conf->read_timeout);

if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
}
#endif

return;
}


void
ngx_http_v2_proxy_connect_process_downstream(ngx_http_request_t *r,
ngx_uint_t do_write)
{
/* downstream -> upstream */

}


ssize_t
ngx_http_v2_proxy_connect_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
ngx_http_request_t *r = c->data;
// r->reading_body = ; // TODO
// TODO: read body data
// ngx_http_upstream_send_request_body
// -> ngx_http_read_unbuffered_request_body
// ... get bufs from DATA frame into r->request_body->bufs
// -> then we need return r->request_body->bufs to caller
r->request_body_no_buffering = 1;

r->headers_in.content_length_n = -1;
Expand Down Expand Up @@ -981,7 +1256,6 @@ ngx_http_v2_proxy_connect_send(ngx_connection_t *c, u_char *buf, size_t size)
#endif



static void
ngx_http_proxy_connect_read_downstream(ngx_http_request_t *r)
{
Expand Down Expand Up @@ -1660,6 +1934,32 @@ ngx_http_proxy_connect_handler(ngx_http_request_t *r)
if (rc != NGX_OK) {
return rc;
}

{
/*
* init request body before handling DATA frames and init_upstream,
* otherwise the DATA frame may not be received in
* ngx_http_v2_state_read_data()?
*/
ngx_http_request_body_t *rb;

rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t));

if (rb == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
#if 0
ngx_http_proxy_connect_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
#endif
}

rb->rest = -1;
// TODO: changed to init_upstream ()
rb->post_handler = ngx_http_proxy_connect_read_downstream;

r->request_body = rb;
}
}
#endif

Expand Down

0 comments on commit 37c42ba

Please sign in to comment.