Hi!请登陆

源码分析 Nginx 数据接收流程详解

2020-3-29 88 3/29

        在nginx基于epoll模型事件驱动流程详解中我们讲到,epoll在触发accept事件之后,会回调ngx_event_accept()方法。这个方法主要做了两件事:

  • 获取accept到的客户端连接句柄,并且初始化一个ngx_connection_t结构体,用以表征这个连接;
  • 检查新的连接是否存在可以读取的数据,如果有,则读取并处理数据,否则将当前连接句柄添加到epoll框架中,以监听其可读事件。

上面的第一个步骤在nginx基于epoll模型事件驱动流程详解已经做了详细讲解,这里我们主要讲解第二个步骤是如何实现的。在ngx_event_accept()方法的最后,其会调用如下一段代码:

void ngx_event_accept(ngx_event_t *ev) {
  do {
    // 省略...
    
    // 建立新连接之后的回调方法,这个方法是在ngx_http_block()方法中解析http配置块的最后
    // ngx_http_optimize_servers()方法中进行初始化的,其指向的是ngx_http_init_connection()方法
    ls->handler(c);

    // 省略...

  } while (ev->available);
}

这里的ls->handler()方法指向的是ngx_http_init_connection(),顾名思义,这个方法是初始化当前新生成的ngx_connection_t连接的。这个方法的本质作用是对客户端连接的事件驱动进行初始化的一个入口方法,如下是该方法的源码:

/**
 * 当前方法存储在ngx_listening_t的handler属性中,其调用位置主要在ngx_event_accept()方法中,
 * 也就是说,每当接收到一个新的客户端请求的时候,ngx_event_accept()方法就会调用当前方法来初始化连接
 */
void ngx_http_init_connection(ngx_connection_t *c) {
  ngx_uint_t i;
  ngx_event_t *rev;
  struct sockaddr_in *sin;
  ngx_http_port_t *port;
  ngx_http_in_addr_t *addr;
  ngx_http_log_ctx_t *ctx;
  ngx_http_connection_t *hc;
#if (NGX_HAVE_INET6)
  struct sockaddr_in6 *sin6;
  ngx_http_in6_addr_t *addr6;
#endif

  hc = ngx_pcalloc(c->pool, sizeof(ngx_http_connection_t));
  if (hc == NULL) {
    ngx_http_close_connection(c);
    return;
  }

  c->data = hc;
  port = c->listening->servers;

  // naddrs表示监听当前端口的server的数量
  // 这里的if-else分支的主要作用是找到与当前请求相匹配的server配置,
  // 然后将其赋值给ngx_http_connection_t结构体的addr_conf属性
  if (port->naddrs > 1) {
    // 这里主要是获取客户端的ip地址
    if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
      ngx_http_close_connection(c);
      return;
    }
    
    switch (c->local_sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
      case AF_INET6:
        sin6 = (struct sockaddr_in6 *) c->local_sockaddr;
        addr6 = port->addrs;
        for (i = 0; i < port->naddrs - 1; i++) {
          // 将获取到的客户端地址与配置的各个地址进行比较,找到匹配的那一个
          if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
            break;
          }
        }

        // 将找到的对应的server配置赋值给ngx_http_connection_t结构体的addr_conf属性
        hc->addr_conf = &addr6[i].conf;
        break;
#endif

      default: /* AF_INET */
        sin = (struct sockaddr_in *) c->local_sockaddr;
        addr = port->addrs;
        for (i = 0; i < port->naddrs - 1; i++) {
          // 将获取到的客户端地址与配置的各个地址进行比较,找到匹配的那一个
          if (addr[i].addr == sin->sin_addr.s_addr) {
            break;
          }
        }

        // 将找到的对应的server配置赋值给ngx_http_connection_t结构体的addr_conf属性
        hc->addr_conf = &addr[i].conf;
        break;
    }
  } else {
    // 这里的else分支表示监听当前端口的server配置只有一个
    switch (c->local_sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
      case AF_INET6:
        addr6 = port->addrs;
        hc->addr_conf = &addr6[0].conf;
        break;
#endif
      default: /* AF_INET */
        addr = port->addrs;
        hc->addr_conf = &addr[0].conf;
        break;
    }
  }

  hc->conf_ctx = hc->addr_conf->default_server->ctx;
  ctx = ngx_palloc(c->pool, sizeof(ngx_http_log_ctx_t));
  if (ctx == NULL) {
    ngx_http_close_connection(c);
    return;
  }

  ctx->connection = c;
  ctx->request = NULL;
  ctx->current_request = NULL;
  
  c->log->connection = c->number;
  c->log->handler = ngx_http_log_error;
  c->log->data = ctx;
  c->log->action = "waiting for request";
  c->log_error = NGX_ERROR_INFO;

  rev = c->read;
  // 设置了当发生读事件时将要触发的处理方法
  rev->handler = ngx_http_wait_request_handler;
  // 在初始时,并不需要写入事件,因而如果写事件被意外触发了,则直接调用这里的空方法,该方法没有做任何事情
  c->write->handler = ngx_http_empty_handler;

  if (hc->addr_conf->proxy_protocol) {
    hc->proxy_protocol = 1;
    c->log->action = "reading PROXY protocol";
  }

  // 如果当前读事件已经准备就绪,则直接处理该事件
  if (rev->ready) {
    // 如果设置的通过共享锁来处理事件,则将当前读事件添加到ngx_posted_events队列中
    if (ngx_use_accept_mutex) {
      ngx_post_event(rev, &ngx_posted_events);
      return;
    }

    // 如果不是设置的通过共享锁来处理事件,则直接调用读事件的handler()方法处理该事件,
    // 这里的handler指向的是上面设置的ngx_http_wait_request_handler()方法
    rev->handler(rev);
    return;
  }

  // 走到这里说明读事件还未准备就绪,则将其添加到事件循环中
  ngx_add_timer(rev, c->listening->post_accept_timeout);
  ngx_reusable_connection(c, 1);

  // 在当前epoll句柄中添加当前事件的读监听器
  if (ngx_handle_read_event(rev, 0) != NGX_OK) {
    ngx_http_close_connection(c);
    return;
  }
}

上面ngx_http_init_connection()的整体逻辑比较简单,其主要做了如下几个工作:

  • 根据当前连接访问的端口找到配置了该端口的所有server配置块,然后根据虚拟主机的配置,与这些server进行比较,找到相匹配的server配置块;
  • 将当前连接的读事件回调函数设置为ngx_http_wait_request_handler()方法,这个方法也是读取数据的一个关键方法,后面会详细讲解;
  • 将当前连接的写事件的回调函数设置为ngx_http_empty_handler()方法,这个方法是一个空方法,这么做的主要原因在于防止意外触发的写事件,因为当前还处于读取数据的阶段,因而不需要处理写事件;
  • 判断当前读事件是否已经就绪了,如果已经就绪了,则直接调用rev->handler(rev)来触发一次读事件,这里的handler()就是第二步中设置的ngx_http_wait_request_handler()方法;
  • 如果当前读事件没有就绪,则将当前事件通过调用ngx_add_timer()添加到事件框架中,并且通过调用ngx_handle_read_event()方法在epoll句柄中为当前连接注册读事件。

关于这里比较关键的两个方法ngx_http_init_connection()ngx_http_wait_request_handler()的关系,这里需要着重说明一下。由于TCP是遵照滑动窗口协议来传输数据的,这也就是为什么称之为数据流的原因。也就是说,nginx服务器在接收客户端的数据的时候是一段一段来接收的,而具体每一段接收多少是不确定的。当每次有数据到达时,其就会触发epoll句柄上的读事件,以告知nginx服务器需要继续接收数据了。对应这里就是,首先ngx_http_init_connection()会为当前客户端连接在epoll句柄上注册读事件,而ngx_http_wait_request_handler()则是在读事件触发之后会不断回调的一个方法,不过这里ngx_http_init_connection()会首先检查一下读事件是否已经触发,如果已经触发,那么就直接读取连接上的数据,如果没有,才会注册读事件。下面我们来看一下ngx_http_wait_request_handler()方法的实现原理:

static void ngx_http_wait_request_handler(ngx_event_t *rev) {
  u_char *p;
  size_t size;
  ssize_t n;
  ngx_buf_t *b;
  ngx_connection_t *c;
  ngx_http_connection_t *hc;
  ngx_http_core_srv_conf_t *cscf;

  c = rev->data;

  // 如果读事件超时,则关闭该连接
  if (rev->timedout) {
    ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
    ngx_http_close_connection(c);
    return;
  }

  // 如果连接已经关闭,则尝试完整的关闭连接
  if (c->close) {
    ngx_http_close_connection(c);
    return;
  }

  hc = c->data;
  cscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_core_module);
  size = cscf->client_header_buffer_size;

  b = c->buffer;
  if (b == NULL) {
    // 缓冲区为空则初始化缓冲区
    b = ngx_create_temp_buf(c->pool, size);
    if (b == NULL) {
      ngx_http_close_connection(c);
      return;
    }

    c->buffer = b;
  } else if (b->start == NULL) {
    // 这里b->start为NULL,说明当前连接是NGX_AGAIN之后第二次触发的事件,
    // 因而这里会初始化当前连接的相关资源
    b->start = ngx_palloc(c->pool, size);
    if (b->start == NULL) {
      ngx_http_close_connection(c);
      return;
    }

    b->pos = b->start;
    b->last = b->start;
    b->end = b->last + size;
  }

  // 在ngx_event_accept()方法中会对c->recv进行设置(c->recv = ngx_recv;),这里的ngx_recv是一个宏,
  // 其值为ngx_io.recv,这里的ngx_io是在ngx_epoll_module进行初始化的时候赋值的,
  // 其指向的是ngx_os_io,而ngx_os_io的定义为:
  // ngx_os_io_t ngx_os_io = {
  //    ngx_unix_recv,
  //    ngx_readv_chain,
  //    ngx_udp_unix_recv,
  //    ngx_unix_send,
  //    ngx_udp_unix_send,
  //    ngx_udp_unix_sendmsg_chain,
  //    ngx_writev_chain,
  //    0
  // };
  // 也就是说这里的c->recv指向的是ngx_os_io.recv属性,也即ngx_unix_recv()方法
  // 当前方法的主要作用是读取套接字中的数据到当前缓冲区中
  n = c->recv(c, b->last, size);

  // NGX_AGAIN表示此次读取数据是第一次读取,此时连接已经正常建立了,但是没有数据到来,
  // 因而这里只是继续监听当前连接的读事件,并且释放申请的资源,等数据真正到来时在申请
  if (n == NGX_AGAIN) {

    // 如果当前事件不在事件队列中,则将其添加进去
    if (!rev->timer_set) {
      ngx_add_timer(rev, c->listening->post_accept_timeout);
      ngx_reusable_connection(c, 1);
    }

    // 将读事件添加到epoll句柄中,以继续监听读事件
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
      ngx_http_close_connection(c);
      return;
    }

    // 由于此次并没有读取到数据,因而释放当前缓冲区,在下次调用当前方法时,会检查b->start是否为空,
    // 如果为空,则会重新进行初始化。这里进行释放的主要目的是供给其他的请求使用
    if (ngx_pfree(c->pool, b->start) == NGX_OK) {
      b->start = NULL;
    }

    return;
  }

  // 如果读取数据失败,则关闭连接
  if (n == NGX_ERROR) {
    ngx_http_close_connection(c);
    return;
  }

  // 如果读取到的数据长度为0,说明客户端关闭了连接,因而这里关闭connection结构体
  if (n == 0) {
    ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection");
    ngx_http_close_connection(c);
    return;
  }

  // 更新读取到的数据长度
  b->last += n;
  
  if (hc->proxy_protocol) {
    hc->proxy_protocol = 0;
    p = ngx_proxy_protocol_read(c, b->pos, b->last);
    if (p == NULL) {
      ngx_http_close_connection(c);
      return;
    }

    b->pos = p;
    if (b->pos == b->last) {
      c->log->action = "waiting for request";
      b->pos = b->start;
      b->last = b->start;
      ngx_post_event(rev, &ngx_posted_events);
      return;
    }
  }

  c->log->action = "reading client request line";
  ngx_reusable_connection(c, 0);

  // 走到这里说明已经读取到客户端的数据了,因而开始创建ngx_http_request_s结构体以表征当前的请求
  c->data = ngx_http_create_request(c);
  if (c->data == NULL) {
    ngx_http_close_connection(c);
    return;
  }

  // 将当前事件的handler指向ngx_http_process_request_line()方法
  // ngx_http_process_request_line()方法的主要作用是解析完整的请求行
  // 需要注意的是,走到这里,说明当前已经读取到一部分客户端的数据了,但是并不一定读取完整了
  rev->handler = ngx_http_process_request_line;
  ngx_http_process_request_line(rev);
}

ngx_http_wait_request_handler()既然是用于不断接收客户端数据的,那么其必然有一个缓冲区用于存储数据,这个缓冲区就是c->buffer,这个字段的类型是ngx_buf_tngx_http_wait_request_handler()方法主要完成了如下几部分的工作:

  • 检查c->buffer是否初始化了,如果没有,则进行初始化。在初始化的时候,分为两种情况:
    • 当前缓冲区还未初始化,则直接申请内存空间;
    • 当前缓冲区已经初始化,不过c->buffer->start为空,产生这种情况的原因是在后面的if判断中,如果读取数据的返回值为NGX_AGAIN,说明此次是第一次触发读事件,此时连接建立了,但是客户端还未发送数据过来,因而这个时候会把申请的内存给释放掉,也即c->buffer->start被释放了,也就导致了这个属性为NULL;
  • 调用c->recv(c, b->last, size)从连接句柄上读取数据,返回值如果大于0,则表示读取的数据长度;如果等于0,则表示客户端断开了连接;如果为-1,则表示读取数据发生异常了;如果为-2,则表示需要再次读取;
  • 依次检查读取数据的返回值,如果不为正数,则根据具体情况进行不同的处理;
  • 走到这一步,说明数据读取成功,此时就会调用ngx_http_create_request(c)方法创建一个ngx_http_request_s结构体,用于表征当前的请求;
  • 将读事件的回调方法设置为ngx_http_process_request_line(),并且触发一次该方法的调用。这个方法的主要作用是读取数据,并且解析客户端数据中的请求行,如”GET /index HTTP/1.1″;

本文主要对nginx是如何接收客户端的数据的流程进行了讲解,着重说明了从接收到客户端请求,然后建立连接,接收客户端数据,最后处理数据的整体流程。

Tag:

相关推荐