概述
upstream 机制使得 Nginx 成为一个反向代理服务器,Nginx 接收来自下游客户端的 http 请求,并处理该请求,同时根据该请求向上游服务器发送 tcp 请求报文,上游服务器会根据该请求返回相应地响应报文,Nginx 根据上游服务器的响应报文,决定是否向下游客户端转发响应报文。另外 upstream 机制提供了负载均衡的功能,可以将请求负载均衡到集群服务器的某个服务器上面。
启动 upstream
在 Nginx 中调用 ngx_http_upstream_init 方法启动 upstream 机制,但是在使用 upstream 机制之前必须调用 ngx_http_upstream_create 方法创建 ngx_http_upstream_t 结构体,因为默认情况下 ngx_http_request_t 结构体中的 upstream 成员是指向 NULL,该结构体的具体初始化工作还需由 HTTP 模块完成。有关 ngx_http_upstream_t 结构体 和ngx_http_upstream_conf_t 结构体的相关说明可参考文章《Nginx 中 upstream 机制》。
下面是函数 ngx_http_upstream_create 的实现:
/* 创建 ngx_http_upstream_t 结构体 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
u = r->upstream;
/*
* 若已经创建过ngx_http_upstream_t 且定义了cleanup成员,
* 则调用cleanup清理方法将原始结构体清除;
*/
if (u && u->cleanup) {
r->main->count++;
ngx_http_upstream_cleanup(r);
}
/* 从内存池分配ngx_http_upstream_t 结构体空间 */
u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
if (u == NULL) {
return NGX_ERROR;
}
/* 给ngx_http_request_t 结构体成员upstream赋值 */
r->upstream = u;
u->peer.log = r->connection->log;
u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
u->peer.lock = &r->connection->lock;
#endif
#if (NGX_HTTP_CACHE)
r->cache = NULL;
#endif
u->headers_in.content_length_n = -1;
return NGX_OK;
}
关于 upstream 机制的启动方法 ngx_http_upstream_init 的执行流程如下:
- 检查 Nginx 与下游服务器之间连接上的读事件是否在定时器中,即检查 timer_set 标志位是否为 1,若该标志位为 1,则把读事件从定时器中移除;
- 调用 ngx_http_upstream_init_request 方法启动 upstream 机制;
ngx_http_upstream_init_request 方法执行流程如下所示:
- 检查 ngx_http_upstream_t 结构体中的 store 标志位是否为 0;检查 ngx_http_request_t 结构体中的 post_action 标志位是否为0;检查 ngx_http_upstream_conf_t 结构体中的ignore_client_abort 是否为 0;若上面的标志位都为 0,则设置ngx_http_request_t 请求的读事件的回调方法为ngx_http_upstream_rd_check_broken_connection;设置写事件的回调方法为 ngx_http_upstream_wr_check_broken_connection;这两个方法都会调用 ngx_http_upstream_check_broken_connection方法检查 Nginx 与下游之间的连接是否正常,若出现错误,则终止连接;
- 若不满足上面的标志位,即至少有一个不为 0 ,调用请求中ngx_http_upstream_t 结构体中某个 HTTP 模块实现的create_request 方法,构造发往上游服务器的请求;
- 调用 ngx_http_cleanup_add 方法向原始请求的 cleanup 链表尾端添加一个回调 handler 方法,该回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
- 调用 ngx_http_upstream_connect 方法向上游服务器发起连接请求;
/* 初始化启动upstream机制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
ngx_connection_t *c;
/* 获取当前请求所对应的连接 */
c = r->connection;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http init upstream, client timer: %d", c->read->timer_set);
#if (NGX_HTTP_SPDY)
if (r->spdy_stream) {
ngx_http_upstream_init_request(r);
return;
}
#endif
/*
* 检查当前连接上读事件的timer_set标志位是否为1,若该标志位为1,
* 表示读事件在定时器机制中,则需要把它从定时器机制中移除;
* 因为在启动upstream机制后,就不需要对客户端的读操作进行超时管理;
*/
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
if (!c->write->active) {
if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
== NGX_ERROR)
{
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
}
ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
ngx_str_t *host;
ngx_uint_t i;
ngx_resolver_ctx_t *ctx, temp;
ngx_http_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
ngx_http_upstream_srv_conf_t *uscf, **uscfp;
ngx_http_upstream_main_conf_t *umcf;
if (r->aio) {
return;
}
u = r->upstream;
#if (NGX_HTTP_CACHE)
...
...
#endif
/* 文件缓存标志位 */
u->store = (u->conf->store || u->conf->store_lengths);
/*
* 检查ngx_http_upstream_t 结构中标志位 store;
* 检查ngx_http_request_t 结构中标志位 post_action;
* 检查ngx_http_upstream_conf_t 结构中标志位 ignore_client_abort;
* 若上面这些标志位为1,则表示需要检查Nginx与下游(即客户端)之间的TCP连接是否断开;
*/
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
/* 把当前请求包体结构保存在ngx_http_upstream_t 结构的request_bufs链表缓冲区中 */
if (r->request_body) {
u->request_bufs = r->request_body->bufs;
}
/* 调用create_request方法构造发往上游服务器的请求 */
if (u->create_request(r) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* 获取ngx_http_upstream_t结构中主动连接结构peer的local本地地址信息 */
u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);
/* 获取ngx_http_core_module模块的loc级别的配置项结构 */
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
u->output.alignment = clcf->directio_alignment;
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size;
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
u->writer.pool = r->pool;
/* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
if (r->upstream_states == NULL) {
r->upstream_states = ngx_array_create(r->pool, 1,
sizeof(ngx_http_upstream_state_t));
if (r->upstream_states == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
} else {
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
}
/*
* 调用ngx_http_cleanup_add方法原始请求的cleanup链表尾端添加一个回调handler方法,
* 该handler回调方法设置为ngx_http_upstream_cleanup,若当前请求结束时会调用该方法做一些清理工作;
*/
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
cln->handler = ngx_http_upstream_cleanup;
cln->data = r;
u->cleanup = &cln->handler;
if (u->resolved == NULL) {
/* 若没有实现u->resolved标志位,则定义上游服务器的配置 */
uscf = u->conf->upstream;
} else {
/*
* 若实现了u->resolved标志位,则解析主机域名,指定上游服务器的地址;
*/
/*
* 若已经指定了上游服务器地址,则不需要解析,
* 直接调用ngx_http_upstream_connection方法向上游服务器发起连接;
* 并return从当前函数返回;
*/
if (u->resolved->sockaddr) {
if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
return;
}
/*
* 若还没指定上游服务器的地址,则需解析主机域名;
* 若成功解析出上游服务器的地址和端口号,
* 则调用ngx_http_upstream_connection方法向上游服务器发起连接;
*/
host = &u->resolved->host;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
uscf = uscfp[i];
if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
{
goto found;
}
}
if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream "%V"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
temp.name = *host;
ctx = ngx_resolve_start(clcf->resolver, &temp);
if (ctx == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no resolver defined to resolve %V", host);
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}
ctx->name = *host;
ctx->handler = ngx_http_upstream_resolve_handler;
ctx->data = r;
ctx->timeout = clcf->resolver_timeout;
u->resolved->ctx = ctx;
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
found:
if (uscf == NULL) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"no upstream configuration");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
}
static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
ngx_http_upstream_check_broken_connection(r, r->connection->read);
}
static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
ngx_http_upstream_check_broken_connection(r, r->connection->write);
}
建立连接
upstream 机制与上游服务器建立 TCP 连接时,采用的是非阻塞模式的套接字,即发起连接请求之后立即返回,不管连接是否建立成功,若没有立即建立成功,则需在 epoll 事件机制中监听该套接字。向上游服务器发起连接请求由函数ngx_http_upstream_connect 实现。在分析 ngx_http_upstream_connect 方法之前,首先分析下 ngx_event_connect_peer 方法,因为该方法会被ngx_http_upstream_connect 方法调用。
ngx_event_connect_peer 方法的执行流程如下所示:
- 调用 ngx_socket 方法创建一个 TCP 套接字;
- 调用 ngx_nonblocking 方法设置该 TCP 套接字为非阻塞模式;
- 设置套接字连接接收和发送网络字符流的方法;
- 设置套接字连接上读、写事件方法;
- 将 TCP 套接字以期待 EPOLLIN | EPOLLOUT 事件的方式添加到epoll 事件机制中;
- 调用 connect 方法向服务器发起 TCP 连接请求;
ngx_http_upstream_connect 方法表示向上游服务器发起连接请求,其执行流程如下所示:
调用 ngx_event_connect_peer 方法主动向上游服务器发起连接请求,需要注意的是该方法已经将相应的套接字注册到epoll事件机制来监听读、写事件,该方法返回值为 rc;
若 rc = NGX_ERROR,表示发起连接失败,则调用ngx_http_upstream_finalize_request 方法关闭连接请求,并 return 从当前函数返回;
- 若 rc = NGX_BUSY,表示当前上游服务器处于不活跃状态,则调用 ngx_http_upstream_next 方法根据传入的参数尝试重新发起连接请求,并 return 从当前函数返回;
- 若 rc = NGX_DECLINED,表示当前上游服务器负载过重,则调用 ngx_http_upstream_next 方法尝试与其他上游服务器建立连接,并 return 从当前函数返回;
- 设置上游连接 ngx_connection_t 结构体的读事件、写事件的回调方法 handler 都为 ngx_http_upstream_handler,设置 ngx_http_upstream_t 结构体的写事件 write_event_handler 的回调为 ngx_http_upstream_send_request_handler,读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_header;
- 若 rc = NGX_AGAIN,表示当前已经发起连接,但是没有收到上游服务器的确认应答报文,即上游连接的写事件不可写,则需调用 ngx_add_timer 方法将上游连接的写事件添加到定时器中,管理超时确认应答;
- 若 rc = NGX_OK,表示成功建立连接,则调用 ngx_http_upsream_send_request 方法向上游服务器发送请求;
/* 向上游服务器建立连接 */
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_time_t *tp;
ngx_connection_t *c;
r->connection->log->action = "connecting to upstream";
if (u->state && u->state->response_sec) {
tp = ngx_timeofday();
u->state->response_sec = tp->sec - u->state->response_sec;
u->state->response_msec = tp->msec - u->state->response_msec;
}
u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
tp = ngx_timeofday();
u->state->response_sec = tp->sec;
u->state->response_msec = tp->msec;
/* 向上游服务器发起连接 */
rc = ngx_event_connect_peer(&u->peer);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);
/* 下面根据rc不同返回值进行分析 */
/* 若建立连接失败,则关闭当前请求,并return从当前函数返回 */
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->state->peer = u->peer.name;
/*
* 若返回rc = NGX_BUSY,表示当前上游服务器不活跃,
* 则调用ngx_http_upstream_next向上游服务器重新发起连接,
* 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
* 并return从当前函数返回;
*/
if (rc == NGX_BUSY) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
return;
}
/*
* 若返回rc = NGX_DECLINED,表示当前上游服务器负载过重,
* 则调用ngx_http_upstream_next向上游服务器重新发起连接,
* 实际上,该方法最终还是调用ngx_http_upstream_connect方法;
* 并return从当前函数返回;
*/
if (rc == NGX_DECLINED) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
c = u->peer.connection;
c->data = r;
/* 设置当前连接ngx_connection_t 上读、写事件的回调方法 */
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;
/* 设置upstream机制的读、写事件的回调方法 */
u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;
c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;
if (c->pool == NULL) {
/* we need separate pool here to be able to cache SSL connections */
c->pool = ngx_create_pool(128, r->connection->log);
if (c->pool == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
c->log = r->connection->log;
c->pool->log = c->log;
c->read->log = c->log;
c->write->log = c->log;
/* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
u->writer.out = NULL;
u->writer.last = &u->writer.out;
u->writer.connection = c;
u->writer.limit = 0;
/*
* 检查当前ngx_http_upstream_t 结构的request_sent标志位,
* 若该标志位为1,则表示已经向上游服务器发送请求,即本次发起连接失败;
* 则调用ngx_http_upstream_reinit方法重新向上游服务器发起连接;
*/
if (u->request_sent) {
if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
if (r->request_body
&& r->request_body->buf
&& r->request_body->temp_file
&& r == r->main)
{
/*
* the r->request_body->buf can be reused for one request only,
* the subrequests should allocate their own temporary bufs
*/
u->output.free = ngx_alloc_chain_link(r->pool);
if (u->output.free == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
u->output.free->buf = r->request_body->buf;
u->output.free->next = NULL;
u->output.allocated = 1;
r->request_body->buf->pos = r->request_body->buf->start;
r->request_body->buf->last = r->request_body->buf->start;
r->request_body->buf->tag = u->output.tag;
}
u->request_sent = 0;
/*
* 若返回rc = NGX_AGAIN,表示没有收到上游服务器允许建立连接的应答;
* 由于写事件已经添加到epoll事件机制中等待可写事件发生,
* 所有在这里只需将当前连接的写事件添加到定时器机制中进行超时管理;
* 并return从当前函数返回;
*/
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
/*
* 若返回值rc = NGX_OK,表示连接成功建立,
* 调用此方法向上游服务器发送请求 */
ngx_http_upstream_send_request(r, u);
}
发送请求
当 Nginx 与上游服务器成功建立连接之后,会调用 ngx_http_upstream_send_request 方法发送请求,若是该方法不能一次性把请求内容发送完成时,则需等待 epoll 事件机制的写事件发生,若写事件发生,则会调用写事件 write_event_handler 的回调方法 ngx_http_upstream_send_request_handler 继续发送请求,并且有可能会多次调用该写事件的回调方法, 直到把请求发送完成。
下面是 ngx_http_upstream_send_request 方法的执行流程:
- 检查 ngx_http_upstream_t 结构体中的标志位 request_sent 是否为 0,若为 0 表示未向上游发送请求。 且此时调用 ngx_http_upstream_test_connect 方法测试是否与上游建立连接,若返回非 NGX_OK, 则需调用 ngx_http_upstream_next 方法试图与上游建立连接,并return 从当前函数返回;
- 调用 ngx_output_chain 方法向上游发送保存在 request_bufs 链表中的请求数据,该方法返回值为 rc,并设置 request_sent 标志位为 1,检查连接上写事件 timer_set 标志位是否为1,若为 1 调用ngx_del_timer 方法将写事件从定时器中移除;
- 若 rc = NGX_ERROR,表示当前连接上出错,则调用 ngx_http_upstream_next 方法尝试再次与上游建立连接,并 return 从当前函数返回;
- 若 rc = NGX_AGAIN,并是当前请求数据未完全发送,则需将剩余的请求数据保存在 ngx_http_upstream_t 结构体的 output 成员中,并且调用 ngx_add_timer 方法将当前连接上的写事件添加到定时器中,调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中,等待可写事件发生,并return 从当前函数返回;
- 若 rc = NGX_OK,表示已经发送全部请求数据,则准备接收来自上游服务器的响应报文;
- 先调用 ngx_add_timer 方法将当前连接的读事件添加到定时器机制中,检测接收响应是否超时,检查当前连接上的读事件是否准备就绪,即标志位 ready 是否为1,若该标志位为 1,则调用 ngx_http_upstream_process_header 方法开始处理响应头部,并 return 从当前函数返回;
- 若当前连接上读事件的标志位 ready 为0,表示暂时无可读数据,则需等待读事件再次被触发,由于原始读事件的回调方法为 ngx_http_upstream_process_header,所有无需重新设置。由于请求已经全部发送,防止写事件的回调方法 ngx_http_upstream_send_request_handler 再次被触发,因此需要重新设置写事件的回调方法为 ngx_http_upstream_dummy_handler,该方法实际上不执行任何操作,同时调用 ngx_handle_write_event 方法将写事件注册到 epoll 事件机制中;
/* 向上游服务器发送请求 */
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_connection_t *c;
/* 获取当前连接 */
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream send request");
/*
* 若标志位request_sent为0,表示还未发送请求;
* 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
* 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
* 并return从当前函数返回;
*/
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
c->log->action = "sending request to upstream";
/*
* 调用ngx_output_chain方法向上游发送保存在request_bufs链表中的请求数据;
* 值得注意的是该方法的第二个参数可以是NULL也可以是request_bufs,那怎么来区分呢?
* 若是第一次调用该方法发送request_bufs链表中的请求数据时,request_sent标志位为0,
* 此时,第二个参数自然就是request_bufs了,那么为什么会有NULL作为参数的情况呢?
* 当在第一次调用该方法时,并不能一次性把所有request_bufs中的数据发送完毕时,
* 此时,会把剩余的数据保存在output结构里面,并把标志位request_sent设置为1,
* 因此,再次发送请求数据时,不用指定request_bufs参数,因为此时剩余数据已经保存在output中;
*/
rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
/* 向上游服务器发送请求之后,把request_sent标志位设置为1 */
u->request_sent = 1;
/* 下面根据不同rc的返回值进行判断 */
/*
* 若返回值rc=NGX_ERROR,表示当前连接上出错,
* 将错误信息传递给ngx_http_upstream_next方法,
* 该方法根据错误信息决定是否重新向上游服务器发起连接;
* 并return从当前函数返回;
*/
if (rc == NGX_ERROR) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/*
* 检查当前连接上写事件的标志位timer_set是否为1,
* 若该标志位为1,则需把写事件从定时器机制中移除;
*/
if (c->write->timer_set) {
ngx_del_timer(c->write);
}
/*
* 若返回值rc = NGX_AGAIN,表示请求数据并未完全发送,
* 即有剩余的请求数据保存在output中,但此时,写事件已经不可写,
* 则调用ngx_add_timer方法把当前连接上的写事件添加到定时器机制,
* 并调用ngx_handle_write_event方法将写事件注册到epoll事件机制中;
* 并return从当前函数返回;
*/
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->send_timeout);
if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
/* rc == NGX_OK */
/*
* 若返回值 rc = NGX_OK,表示已经发送完全部请求数据,
* 准备接收来自上游服务器的响应报文,则执行以下程序;
*/
if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
if (ngx_tcp_push(c->fd) == NGX_ERROR) {
ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
ngx_tcp_push_n " failed");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
}
/* 将当前连接上读事件添加到定时器机制中 */
ngx_add_timer(c->read, u->conf->read_timeout);
/*
* 若此时,读事件已经准备就绪,
* 则调用ngx_http_upstream_process_header方法开始接收并处理响应头部;
* 并return从当前函数返回;
*/
if (c->read->ready) {
ngx_http_upstream_process_header(r, u);
return;
}
/*
* 若当前读事件未准备就绪;
* 则把写事件的回调方法设置为ngx_http_upstream_dumy_handler方法(不进行任何实际操作);
* 并把写事件注册到epoll事件机制中;
*/
u->write_event_handler = ngx_http_upstream_dummy_handler;
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
}
当无法一次性将请求内容全部发送完毕,则需等待 epoll 事件机制的写事件发生,一旦发生就会调用回调方法 ngx_http_upstream_send_request_handler。
ngx_http_upstream_send_request_handler 方法的执行流程如下所示:
- 检查连接上写事件是否超时,即timedout 标志位是否为 1,若为 1 表示已经超时,则调用 ngx_http_upstream_next 方法重新向上游发起连接请求,并 return 从当前函数返回;
- 若标志位 timedout 为0,即不超时,检查 header_sent 标志位是否为 1,表示已经接收到来自上游服务器的响应头部,则不需要再向上游发送请求,将写事件的回调方法设置为 ngx_http_upstream_dummy_handler,同时将写事件注册到 epoll 事件机制中,并return 从当前函数返回;
- 若标志位 header_sent 为 0,则调用 ngx_http_upstream_send_request 方法向上游发送请求数据;
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request handler");
/* 检查当前连接上写事件的超时标志位 */
if (c->write->timedout) {
/* 执行超时重连机制 */
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
#if (NGX_HTTP_SSL)
if (u->ssl && c->ssl == NULL) {
ngx_http_upstream_ssl_init_connection(r, u, c);
return;
}
#endif
/* 已经接收到上游服务器的响应头部,则不需要再向上游服务器发送请求数据 */
if (u->header_sent) {
/* 将写事件的回调方法设置为不进行任何实际操作的方法ngx_http_upstream_dumy_handler */
u->write_event_handler = ngx_http_upstream_dummy_handler;
/* 将写事件注册到epoll事件机制中,并return从当前函数返回 */
(void) ngx_handle_write_event(c->write, 0);
return;
}
/* 若没有接收来自上游服务器的响应头部,则需向上游服务器发送请求数据 */
ngx_http_upstream_send_request(r, u);
}
接收响应
接收响应头部
当 Nginx 已经向上游发送请求,准备开始接收来自上游的响应头部,由方法 ngx_http_upstream_process_header 实现,该方法接收并解析响应头部。
ngx_http_upstream_process_header 方法的执行流程如下:
- 检查上游连接上的读事件是否超时,若标志位 timedout 为 1,则表示超时,此时调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
- 若标志位 timedout 为 0,接着检查 ngx_http_upstream_t 结构体中的标志位 request_sent,若该标志位为 0,表示未向上游发送请求,同时调用 ngx_http_upstream_test_connect 方法测试连接状态,若该方法返回值为非 NGX_OK,表示与上游已经断开连接,则调用 ngx_http_upstream_next 方法重新与上游建立连接,并 return 从当前函数返回;
- 检查 ngx_http_upstream_t 结构体中接收响应头部的 buffer 缓冲区是否有内存空间以便接收响应头部,若 buffer.start 为 NULL,表示该缓冲区为空,则需调用 ngx_palloc 方法分配内存,该内存大小 buffer_size 由 ngx_http_upstream_conf_t 配置结构体的 buffer_size 成员指定;
调用 recv 方法开始接收来自上游服务器的响应头部,并根据该方法的返回值 n 进行判断:
若 n = NGX_AGAIN,表示读事件未准备就绪,需要等待下次读事件被触发时继续接收响应头部,此时,调用 ngx_add_timer 方法将读事件添加到定时器中,同时调用 ngx_handle_read_event 方法将读事件注册到epoll 事件机制中,并 return 从当前函数返回;
- 若 n = NGX_ERROR 或 n = 0,表示上游连接发生错误 或 上游服务器主动关闭连接,则调用 ngx_http_upstream_next 方法重新发起连接请求,并 return 从当前函数返回;
若 n 大于 0,表示已经接收到响应头部,此时,调用 ngx_http_upstream_t 结构体中由 HTTP 模块实现的 process_header 方法解析响应头部,且返回 rc 值;
若 rc = NGX_AGAIN,表示接收到的响应头部不完整,检查接收缓冲区 buffer 是否还有剩余的内存空间,若缓冲区没有剩余的内存空间,表示接收到的响应头部过大,此时调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;若缓冲区还有剩余的内存空间,则continue 继续接收响应头部;
- 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的响应头部是非法的,则调用 ngx_http_upstream_next 方法重新建立连接,并 return 从当前函数返回;
- 若 rc = NGX_ERROR,表示连接出错,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
- 若 rc = NGX_OK,表示已接收到完整的响应头部,则调用 ngx_http_upstream_process_headers 方法处理已解析的响应头部,该方法会将已解析出来的响应头部保存在 ngx_http_request_t 结构体中的 headers_out 成员;
检查 ngx_http_request_t 结构体的 subrequest_in_memory 成员决定是否需要转发响应给下游服务器;
若 subrequest_in_memory 为 0,表示需要转发响应给下游服务器,则调用 ngx_http_upstream_send_response 方法开始转发响应给下游服务器,并 return 从当前函数返回;
若 subrequest_in_memory 为 1,表示不需要将响应转发给下游,此时检查 HTTP 模块是否定义了 ngx_http_upstream_t 结构体中的 input_filter 方法处理响应包体;
若没有定义 input_filter 方法,则使用 upstream 机制默认方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
- 若定义了自己的 input_filter 方法,则首先调用 input_filter_init 方法为处理响应包体做初始化工作;
- 检查接收缓冲区 buffer 在解析完响应头部之后剩余的字符流,若有剩余的字符流,则表示已经预接收了响应包体,此时调用 input_filter 方法处理响应包体;
- 设置 upstream 机制读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_body_in_memory,并调用该方法开始接收并解析响应包体;
/* 接收并解析响应头部 */
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
c = u->peer.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process header");
c->log->action = "reading response header from upstream";
/* 检查当前连接上的读事件是否超时 */
if (c->read->timedout) {
/*
* 若标志位timedout为1,表示读事件超时;
* 则把超时错误传递给ngx_http_upstream_next方法,
* 该方法根据允许的错误进行重连接策略;
* 并return从当前函数返回;
*/
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
return;
}
/*
* 若标志位request_sent为0,表示还未发送请求;
* 且ngx_http_upstream_test_connect方法返回非NGX_OK,标志当前还未与上游服务器成功建立连接;
* 则需要调用ngx_http_upstream_next方法尝试与下一个上游服务器建立连接;
* 并return从当前函数返回;
*/
if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/*
* 检查ngx_http_upstream_t结构体中接收响应头部的buffer缓冲区;
* 若接收缓冲区buffer未分配内存,则调用ngx_palloce方法分配内存,
* 该内存的大小buffer_size由ngx_http_upstream_conf_t配置结构的buffer_size指定;
*/
if (u->buffer.start == NULL) {
u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
if (u->buffer.start == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* 调整接收缓冲区buffer,准备接收响应头部 */
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
u->buffer.end = u->buffer.start + u->conf->buffer_size;
/* 表示该缓冲区内存可被复用、数据可被改变 */
u->buffer.temporary = 1;
u->buffer.tag = u->output.tag;
/* 初始化headers_in的成员headers链表 */
if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
#if (NGX_HTTP_CACHE)
if (r->cache) {
u->buffer.pos += r->cache->header_start;
u->buffer.last = u->buffer.pos;
}
#endif
}
for ( ;; ) {
/* 调用recv方法从当前连接上读取响应头部数据 */
n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
/* 下面根据 recv 方法不同返回值 n 进行判断 */
/*
* 若返回值 n = NGX_AGAIN,表示读事件未准备就绪,
* 需等待下次读事件被触发时继续接收响应头部,
* 即将读事件注册到epoll事件机制中,等待可读事件发生;
* 并return从当前函数返回;
*/
if (n == NGX_AGAIN) {
#if 0
ngx_add_timer(rev, u->read_timeout);
#endif
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
return;
}
if (n == 0) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream prematurely closed connection");
}
/*
* 若返回值 n = NGX_ERROR 或 n = 0,则表示上游服务器已经主动关闭连接;
* 此时,调用ngx_http_upstream_next方法决定是否重新发起连接;
* 并return从当前函数返回;
*/
if (n == NGX_ERROR || n == 0) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
return;
}
/* 若返回值 n 大于 0,表示已经接收到响应头部 */
u->buffer.last += n;
#if 0
u->valid_header_in = 0;
u->peer.cached = 0;
#endif
/*
* 调用ngx_http_upstream_t结构体中process_header方法开始解析响应头部;
* 并根据该方法返回值进行不同的判断;
*/
rc = u->process_header(r);
/*
* 若返回值 rc = NGX_AGAIN,表示接收到的响应头部不完整,
* 需等待下次读事件被触发时继续接收响应头部;
* continue继续接收响应;
*/
if (rc == NGX_AGAIN) {
if (u->buffer.last == u->buffer.end) {
ngx_log_error(NGX_LOG_ERR, c->log, 0,
"upstream sent too big header");
ngx_http_upstream_next(r, u,
NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
continue;
}
break;
}
/*
* 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
* 则表示接收到的响应头部是非法的,
* 调用ngx_http_upstream_next方法决定是否重新发起连接;
* 并return从当前函数返回;
*/
if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
return;
}
/*
* 若返回值 rc = NGX_ERROR,表示出错,
* 则调用ngx_http_upstream_finalize_request方法结束该请求;
* 并return从当前函数返回;
*/
if (rc == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
/* rc == NGX_OK */
/*
* 若返回值 rc = NGX_OK,表示成功解析到完整的响应头部;*/
if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {
if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
return;
}
if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
return;
}
}
/* 调用ngx_http_upstream_process_headers方法处理已解析处理的响应头部 */
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}
/*
* 检查ngx_http_request_t 结构体的subrequest_in_memory成员决定是否转发响应给下游服务器;
* 若该标志位为0,则需调用ngx_http_upstream_send_response方法转发响应给下游服务器;
* 并return从当前函数返回;
*/
if (!r->subrequest_in_memory) {
ngx_http_upstream_send_response(r, u);
return;
}
/* 若不需要转发响应,则调用ngx_http_upstream_t中的input_filter方法处理响应包体 */
/* subrequest content in memory */
/*
* 若HTTP模块没有定义ngx_http_upstream_t中的input_filter处理方法;
* 则使用upstream机制默认方法ngx_http_upstream_non_buffered_filter;
*
* 若HTTP模块实现了input_filter方法,则不使用upstream默认的方法;
*/
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
/*
* 调用input_filter_init方法为处理包体做初始化工作;
*/
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 检查接收缓冲区是否有剩余的响应数据;
* 因为响应头部已经解析完毕,若接收缓冲区还有未被解析的剩余数据,
* 则该数据就是响应包体;
*/
n = u->buffer.last - u->buffer.pos;
/*
* 若接收缓冲区有剩余的响应包体,调用input_filter方法开始处理已接收到响应包体;
*/
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
/* 调用input_filter方法处理响应包体 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
/* 设置upstream机制的读事件回调方法read_event_handler为ngx_http_upstream_process_body_in_memory */
u->read_event_handler = ngx_http_upstream_process_body_in_memory;
/* 调用ngx_http_upstream_process_body_in_memory方法开始处理响应包体 */
ngx_http_upstream_process_body_in_memory(r, u);
}
接收响应包体
接收并解析响应包体由 ngx_http_upstream_process_body_in_memory 方法实现;
ngx_http_upstream_process_body_in_memory 方法的执行流程如下所示:
- 检查上游连接上读事件是否超时,若标志位 timedout 为 1,则表示已经超时,此时调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
检查接收缓冲区 buffer 是否还有剩余的内存空间,若没有剩余的内存空间,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;若有剩余的内存空间则调用 recv 方法开始接收响应包体;
若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体,调用 ngx_handle_read_event 方法将读事件注册到 epoll 事件机制中,同时将读事件添加到定时器机制中;
- 若返回值 n = 0 或 n = NGX_ERROR,则调用 ngx_http_upstream_finalize_request 方法结束请求,并 return 从当前函数返回;
若返回值 n 大于 0,则表示成功接收到响应包体,调用 input_filter 方法开始处理响应包体,检查读事件的 ready 标志位;
若标志位 ready 为 1,表示仍有可读的响应包体数据,因此回到步骤 2 继续调用 recv 方法读取响应包体,直到读取完毕;
- 若标志位 ready 为 0,则调用 ngx_handle_read_event 方法将读事件注册到epoll事件机制中,同时调用 ngx_add_timer 方法将读事件添加到定时器机制中;
/* 接收并解析响应包体 */
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
size_t size;
ssize_t n;
ngx_buf_t *b;
ngx_event_t *rev;
ngx_connection_t *c;
c = u->peer.connection;
rev = c->read;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process body on memory");
/*
* 检查读事件标志位timedout是否超时,若该标志位为1,表示响应已经超时;
* 则调用ngx_http_upstream_finalize_request方法结束请求;
* 并return从当前函数返回;
*/
if (rev->timedout) {
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
return;
}
b = &u->buffer;
for ( ;; ) {
/* 检查当前接收缓冲区是否剩余的内存空间 */
size = b->end - b->last;
/*
* 若接收缓冲区不存在空闲的内存空间,
* 则调用ngx_http_upstream_finalize_request方法结束请求;
* 并return从当前函数返回;
*/
if (size == 0) {
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
"upstream buffer is too small to read response");
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 若接收缓冲区有可用的内存空间,
* 则调用recv方法开始接收响应包体;
*/
n = c->recv(c, b->last, size);
/*
* 若返回值 n = NGX_AGAIN,表示等待下一次触发读事件再接收响应包体;
*/
if (n == NGX_AGAIN) {
break;
}
/*
* 若返回值n = 0(表示上游服务器主动关闭连接),或n = NGX_ERROR(表示出错);
* 则调用ngx_http_upstream_finalize_request方法结束请求;
* 并return从当前函数返回;
*/
if (n == 0 || n == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, n);
return;
}
/* 若返回值 n 大于0,表示成功读取到响应包体 */
u->state->response_length += n;
/* 调用input_filter方法处理本次接收到的响应包体 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 检查读事件的ready标志位,若为1,继续读取响应包体 */
if (!rev->ready) {
break;
}
}
if (u->length == 0) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}
/*
* 若读事件的ready标志位为0,表示读事件未准备就绪,
* 则将读事件注册到epoll事件机制中,添加到定时器机制中;
* 读事件的回调方法不改变,即依旧为ngx_http_upstream_process_body_in_memory;
*/
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (rev->active) {
ngx_add_timer(rev, u->conf->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
转发响应
下面看下 upstream 处理上游响应包体的三种方式:
- 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 1 时,upstream 不转发响应包体到下游,并由HTTP 模块实现的input_filter() 方法处理包体;
- 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 1 时,upstream 将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
- 当请求结构体 ngx_http_request_t 中的成员subrequest_in_memory 标志位为 0 时,且ngx_http_upstream_conf_t 配置结构体中的成员buffering 标志位为 0 时,upstream 将使用固定大小的缓冲区来转发响应包体;
转发响应由函数 ngx_http_upstream_send_response 实现,该函数的执行流程如下:
- 调用 ngx_http_send_header 方法转发响应头部,并将 ngx_http_upstream_t 结构体中的 header_sent 标志位设置为 1,表示已经转发响应头部;
- 若临时文件还保存着请求包体,则需调用 ngx_pool_run_cleanup_filter 方法清理临时文件;
- 检查标志位 buffering,若该标志位为 1,表示需要开启文件缓存,若该标志位为 0,则不需要开启文件缓存,只需要以固定的内存块大小转发响应包体即可;
若标志位 buffering 为0;
则检查 HTTP 模块是否实现了自己的 input_filter 方法,若没有则使用 upstream 机制默认的方法 ngx_http_upstream_non_buffered_filter;
- 设置 ngx_http_upstream_t 结构体中读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_non_buffered_upstream,当接收上游响应时,会通过 ngx_http_upstream_handler 方法最终调用 ngx_http_upstream_process_non_buffered_uptream 来接收响应;
- 设置 ngx_http_upstream_t 结构体中写事件 write_event_handler 的回调方法为 ngx_http_upstream_process_non_buffered_downstream,当向下游发送数据时,会通过 ngx_http_handler 方法最终调用 ngx_http_upstream_process_non_buffered_downstream 方法来发送响应包体;
- 调用 input_filter_init 方法为 input_filter 方法处理响应包体做初始化工作;
检查接收缓冲区 buffer 在解析完响应头部之后,是否还有剩余的响应数据,若有表示预接收了响应包体:
若在解析响应头部区间,预接收了响应包体,则调用 input_filter 方法处理该部分预接收的响应包体,并调用 ngx_http_upstream_process_non_buffered_downstream 方法转发本次接收到的响应包体给下游服务器;
若在解析响应头部区间,没有接收响应包体,则首先清空接收缓冲区 buffer 以便复用来接收响应包体,检查上游连接上读事件是否准备就绪,若标志位 ready 为1,表示准备就绪,则调用 ngx_http_upstream_process_non_buffered_upstream 方法接收上游响应包体;若标志位 ready 为 0,则 return 从当前函数返回;
若标志位 buffering 为1;
初始化 ngx_http_upstream_t 结构体中的 ngx_event_pipe_t pipe 成员;
- 调用 input_filter_init 方法为 input_filter 方法处理响应包体做初始化工作;
- 设置上游连接上的读事件 read_event_handler 的回调方法为 ngx_http_upstream_process_upstream;
- 设置上游连接上的写事件 write_event_handler 的回调方法为 ngx_http_upstream_process_downstream;
- 调用 ngx_http_upstream_proess_upstream 方法处理由上游服务器发来的响应包体;
/* 转发响应包体 */
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
int tcp_nodelay;
ssize_t n;
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
/* 调用ngx_http_send_hander方法向下游发送响应头部 */
rc = ngx_http_send_header(r);
if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
/* 将标志位header_sent设置为1 */
u->header_sent = 1;
if (u->upgrade) {
ngx_http_upstream_upgrade(r, u);
return;
}
/* 获取Nginx与下游之间的TCP连接 */
c = r->connection;
if (r->header_only) {
if (u->cacheable || u->store) {
if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
ngx_connection_error(c, ngx_socket_errno,
ngx_shutdown_socket_n " failed");
}
r->read_event_handler = ngx_http_request_empty_handler;
r->write_event_handler = ngx_http_request_empty_handler;
c->error = 1;
} else {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
}
/* 若临时文件保存着请求包体,则调用ngx_pool_run_cleanup_file方法清理临时文件的请求包体 */
if (r->request_body && r->request_body->temp_file) {
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/*
* 若标志位buffering为0,转发响应时以下游服务器网速优先;
* 即只需分配固定的内存块大小来接收来自上游服务器的响应并转发,
* 当该内存块已满,则暂停接收来自上游服务器的响应数据,
* 等待把内存块的响应数据转发给下游服务器后有剩余内存空间再继续接收响应;
*/
if (!u->buffering) {
/*
* 若HTTP模块没有实现input_filter方法,
* 则采用upstream机制默认的方法ngx_http_upstream_non_buffered_filter;
*/
if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}
/*
* 设置ngx_http_upstream_t结构体中读事件的回调方法为ngx_http_upstream_non_buffered_upstream,(即读取上游响应的方法);
* 设置当前请求ngx_http_request_t结构体中写事件的回调方法为ngx_http_upstream_process_non_buffered_downstream,(即转发响应到下游的方法);
*/
u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream;
r->limit_rate = 0;
/* 调用input_filter_init为input_filter方法处理响应包体做初始化工作 */
if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
/* 检查解析完响应头部后接收缓冲区buffer是否已接收了响应包体 */
n = u->buffer.last - u->buffer.pos;
/* 若接收缓冲区已经接收了响应包体 */
if (n) {
u->buffer.last = u->buffer.pos;
u->state->response_length += n;
/* 调用input_filter方法开始处理响应包体 */
if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 调用该方法把本次接收到的响应包体转发给下游服务器 */
ngx_http_upstream_process_non_buffered_downstream(r);
} else {
/* 若接收缓冲区中没有响应包体,则将其清空,即复用这个缓冲区 */
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;
if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/*
* 若当前连接上读事件已准备就绪,
* 则调用ngx_http_upstream_process_non_buffered_upstream方法接收响应包体并处理;
*/
if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}
return;
}
/*
* 若ngx_http_upstream_t结构体的buffering标志位为1,则转发响应包体时以上游网速优先;
* 即分配更多的内存和缓存,即一直接收来自上游服务器的响应,把来自上游服务器的响应保存的内存或缓存中;
*/
/* TODO: preallocate event_pipe bufs, look "Content-Length" */
#if (NGX_HTTP_CACHE)
...
...
#endif
/* 初始化ngx_event_pipe_t结构体 p */
p = u->pipe;
p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection;
p->downstream = c;
p->pool = r->pool;
p->log = c->log;
p->cacheable = u->cacheable || u->store;
p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path;
p->temp_file->pool = r->pool;
if (p->cacheable) {
p->temp_file->persistent = 1;
} else {
p->temp_file->log_level = NGX_LOG_WARN;
p->temp_file->warn = "an upstream response is buffered "
"to a temporary file";
}
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;
/* 初始化预读链表缓冲区preread_bufs */
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;
p->preread_size = u->buffer.last - u->buffer.pos;
if (u->cacheable) {
p->buf_to_file = ngx_calloc_buf(r->pool);
if (p->buf_to_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
p->buf_to_file->start = u->buffer.start;
p->buf_to_file->pos = u->buffer.start;
p->buf_to_file->last = u->buffer.pos;
p->buf_to_file->temporary = 1;
}
if (ngx_event_flags & NGX_USE_AIO_EVENT) {
/* the posted aio operation may corrupt a shadow buffer */
p->single_buf = 1;
}
/* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
p->free_bufs = 1;
/*
* event_pipe would do u->buffer.last += p->preread_size
* as though these bytes were read
*/
u->buffer.last = u->buffer.pos;
if (u->conf->cyclic_temp_file) {
/*
* we need to disable the use of sendfile() if we use cyclic temp file
* because the writing a new data may interfere with sendfile()
* that uses the same kernel file pages (at least on FreeBSD)
*/
p->cyclic_temp_file = 1;
c->sendfile = 0;
} else {
p->cyclic_temp_file = 0;
}
p->read_timeout = u->conf->read_timeout;
p->send_timeout = clcf->send_timeout;
p->send_lowat = clcf->send_lowat;
p->length = -1;
/* 调用input_filter_init方法进行初始化工作 */
if (u->input_filter_init
&& u->input_filter_init(p->input_ctx) != NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
/* 设置上游读事件的方法 */
u->read_event_handler = ngx_http_upstream_process_upstream;
/* 设置下游写事件的方法 */
r->write_event_handler = ngx_http_upstream_process_downstream;
/* 处理上游响应包体 */
ngx_http_upstream_process_upstream(r, u);
}
当以下游网速优先转发响应包体给下游时,由函数 ngx_http_upstream_process_non_buffered_downstrean 实现,该函数的执行流程如下所示:
- 检查下游连接上写事件是否超时,若标志位 timedout 为1,则表示超时,此时调用 ngx_http_upstream_finalize_request 方法接收请求,并 return 从当前函数返回;
- 调用 ngx_http_upstream_process_non_bufferd_request 方法向下游服务器发送响应包体,此时第二个参数为 1;
/* buffering 标志位为0时,转发响应包体给下游服务器 */
static void
ngx_http_upstream_process_non_buffered_downstream(ngx_http_request_t *r)
{
ngx_event_t *wev;
ngx_connection_t *c;
ngx_http_upstream_t *u;
/* 获取Nginx与下游服务器之间的TCP连接 */
c = r->connection;
/* 获取ngx_http_upstream_t结构体 */
u = r->upstream;
/* 获取当前连接的写事件 */
wev = c->write;
ngx_log_debug0(NGX_LO
- 文章导航