首页 > 网站 > Nginx > 正文

Nginx学习笔记之事件驱动框架处理流程

2024-08-30 12:26:57
字体:
来源:转载
供稿:网友

ngx_event_core_module模块的ngx_event_process_init方法对事件模块做了一些初始化。其中包括将“请求连接”这样一个读事件对应的处理方法(handler)设置为ngx_event_accept函数,并将此事件添加到epoll模块中。当有新连接事件发生时,ngx_event_accept就会被调用。大致流程是这样:

worker进程在ngx_worker_process_cycle方法中不断循环调用ngx_process_events_and_timers函数处理事件,这个函数是事件处理的总入口。

ngx_process_events_and_timers会调用ngx_process_events,这是一个宏,相当于ngx_event_actions.process_events,ngx_event_actions是个全局的结构体,存储了对应事件驱动模块(这里是epoll模块)的10个函数接口。所以这里就是调用了ngx_epoll_module_ctx.actions.process_events函数,也就是ngx_epoll_process_events函数来处理事件。

ngx_epoll_process_events调用Linux函数接口epoll_wait获得“有新连接”这个事件,然后调用这个事件的handler处理函数来对这个事件进行处理。

在上面已经说过handler已经被设置成了ngx_event_accept函数,所以就调用ngx_event_accept进行实际的处理。

下面分析ngx_event_accept方法,它的流程图如下所示:

经过精简的代码如下,注释中的序号对应上图的序号:

voidngx_event_accept(ngx_event_t *ev){ socklen_t  socklen; ngx_err_t  err; ngx_log_t  *log; ngx_uint_t  level; ngx_socket_t  s; ngx_event_t  *rev, *wev; ngx_listening_t  *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; u_char  sa[NGX_SOCKADDRLEN];  if (ev->timedout) {  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {   return;  }   ev->timedout = 0; }  ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);  if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {  ev->available = 1;  } else if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {  ev->available = ecf->multi_accept; }  lc = ev->data; ls = lc->listening; ev->ready = 0;  do {  socklen = NGX_SOCKADDRLEN;   /* 1、accept方法试图建立连接,非阻塞调用 */  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);   if (s == (ngx_socket_t) -1)  {   err = ngx_socket_errno;    if (err == NGX_EAGAIN)   {    /* 没有连接,直接返回 */    return;   }    level = NGX_LOG_ALERT;    if (err == NGX_ECONNABORTED) {    level = NGX_LOG_ERR;    } else if (err == NGX_EMFILE || err == NGX_ENFILE) {    level = NGX_LOG_CRIT;   }    if (err == NGX_ECONNABORTED) {    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {     ev->available--;    }     if (ev->available) {     continue;    }   }    if (err == NGX_EMFILE || err == NGX_ENFILE) {    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)     != NGX_OK)    {     return;    }     if (ngx_use_accept_mutex) {     if (ngx_accept_mutex_held) {      ngx_shmtx_unlock(&ngx_accept_mutex);      ngx_accept_mutex_held = 0;     }      ngx_accept_disabled = 1;     } else {     ngx_add_timer(ev, ecf->accept_mutex_delay);    }   }    return;  }   /* 2、设置负载均衡阈值 */  ngx_accept_disabled = ngx_cycle->connection_n / 8        - ngx_cycle->free_connection_n;   /* 3、从连接池获得一个连接对象 */  c = ngx_get_connection(s, ev->log);   /* 4、为连接创建内存池 */  c->pool = ngx_create_pool(ls->pool_size, ev->log);   c->sockaddr = ngx_palloc(c->pool, socklen);   ngx_memcpy(c->sockaddr, sa, socklen);   log = ngx_palloc(c->pool, sizeof(ngx_log_t));   /* set a blocking mode for aio and non-blocking mode for others */  /* 5、设置套接字属性为阻塞或非阻塞 */  if (ngx_inherited_nonblocking) {   if (ngx_event_flags & NGX_USE_AIO_EVENT) {    if (ngx_blocking(s) == -1) {     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,         ngx_blocking_n " failed");     ngx_close_accepted_connection(c);     return;    }   }   } else {   if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {    if (ngx_nonblocking(s) == -1) {     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,         ngx_nonblocking_n " failed");     ngx_close_accepted_connection(c);     return;    }   }  }   *log = ls->log;   c->recv = ngx_recv;  c->send = ngx_send;  c->recv_chain = ngx_recv_chain;  c->send_chain = ngx_send_chain;   c->log = log;  c->pool->log = log;   c->socklen = socklen;  c->listening = ls;  c->local_sockaddr = ls->sockaddr;  c->local_socklen = ls->socklen;   c->unexpected_eof = 1;   rev = c->read;  wev = c->write;   wev->ready = 1;   if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {   /* rtsig, aio, iocp */   rev->ready = 1;  }   if (ev->deferred_accept) {   rev->ready = 1;   }   rev->log = log;  wev->log = log;   /*   * TODO: MT: - ngx_atomic_fetch_add()   *  or protection by critical section or light mutex   *   * TODO: MP: - allocated in a shared memory   *   - ngx_atomic_fetch_add()   *  or protection by critical section or light mutex   */   c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);   if (ls->addr_ntop) {   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);   if (c->addr_text.data == NULL) {    ngx_close_accepted_connection(c);    return;   }    c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,            c->addr_text.data,            ls->addr_text_max_len, 0);   if (c->addr_text.len == 0) {    ngx_close_accepted_connection(c);    return;   }  }   /* 6、将新连接对应的读写事件添加到epoll对象中 */  if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {   if (ngx_add_conn(c) == NGX_ERROR) {    ngx_close_accepted_connection(c);    return;   }  }   log->data = NULL;  log->handler = NULL;   /* 7、TCP建立成功调用的方法,这个方法在ngx_listening_t结构体中 */  ls->handler(c);  } while (ev->available); /* available标志表示一次尽可能多的建立连接,由配置项multi_accept决定 */}            
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表