首页 > 学院 > 开发设计 > 正文

网络编程定时器三:使用最小堆

2019-11-11 05:31:28
字体:
来源:转载
供稿:网友

前面讨论的定时方案都是以固定频率调用心搏函数tick,并在其中一次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另一中思路是,将所有定器超时时间最小的一个定时器的超时值作为心搏间隔。这样,一旦心搏函数tick执行,超时时间最小的定时器必然到期。我们就可以从剩余定时器中选出超时时间最小的一个,并将这个时间设为下一次心搏间隔。如此反复,就实现了较为精确的定时。

最小堆适合这种解决方案,下面直接给出最小堆方案的代码,并附有测试用例。不过测试用例我仍然只用了alarm来做测试。

#ifndef MIN_HEAP_H#define MIN_HEAP_H#include <iostream>#include <netinet/in.h>#include <time.h>#include <assert.h>#include <string.h>const int BUFFER_SIZE = 1024;class heap_timer;//绑定socket和定时器struct client_data { sockaddr_in addr_; int sockfd_; char buf_[BUFFER_SIZE]; heap_timer* timer_;};//定时器类class heap_timer {public: heap_timer(int delay) { PRintf("birth time %d, delay: %d/n", time(NULL), delay); expire_ = time(NULL) + delay; //注意,和之前的升序链表以及时间轮不同,这次我们在timer中初始化生效时间 } public: void (*timeout_callback_)(client_data*); //定时器的回调函数public: time_t expire_; //定时器生效的绝对时间 client_data* user_data_; //用户数据};class time_heap {public: time_heap(int cap) throw (std::exception) //构造函数之一,初始化一个大小为cap的空堆 : array_(new heap_timer*[cap]), capacity_(cap), cur_size_(0) { memset(array_, 0, sizeof(array_)); //初始化指针 } time_heap(heap_timer** init_array, int size, int capacity) throw (std::exception) //构造函数之二,使用已有数组来初始堆 : array_(new heap_timer*[capacity]), cur_size_(size), capacity_(capacity) { assert(capacity >= size); memset(array_, 0, sizeof(array_)); if(cur_size_ != 0){ for(int i=0; i<cur_size_; ++i) array_[i] = init_array[i]; //初始化堆数组 for(int i=((cur_size_-1)>>1); i>=0; --i) sift_down(i); //多数组中的(cur_size_-1)/2 ~ 0 个元素执行下滤操作 } } ~time_heap() { //销毁时间堆 for(int i=0; i<cur_size_; ++i) delete array_[i]; delete []array_; }public: //添加目标定时器timer void add_timer(heap_timer* timer) throw (std::exception) { assert(timer != NULL); if(cur_size_ >= capacity_) //如果当前堆数组容量不够,扩容 resize(); //新插入一个元素,当前堆大小加1,hole是新插入元素的位置 int hole = cur_size_++; //hole = cur_size_ - 1 int parent = 0; //对从新插入位置到根节点路径上所有节点进行上滤操作 for(; hole > 0; hole = parent){ parent = (hole - 1) >> 1; if(array_[parent]->expire_ <= timer->expire_) break; array_[hole] = array_[parent]; } array_[hole] = timer; } //调整,老规矩,先删除,再添加一次,否则旧定时器还作祟 void adjust_timer(heap_timer* old_timer, heap_timer* new_timer) { del_timer(old_timer); add_timer(new_timer); } //删除目标定时器 void del_timer(heap_timer* timer) { assert(timer != NULL); //仅仅将目标定时器的回调函数设置为空,即所谓的延迟销毁。这将节省真正删除该定时器造成的开销,但这样做容易使堆数组膨胀 //说下为什么延迟销毁效率高:如果不延迟我们需要查找O(lgn)+删除下滤调整O(lgn);而采用延迟删除,由于其他定时器可能触发,其他定时器会调用真正的删除函数pop_heap来删除自己,所以我们采用了延迟删除的定时器慢慢会被调整到堆顶,这时候由于时间到达,执行回调函数(我们已经赋空,tick函数中不会执行空回调函数)。由于为空,什么也不做。然后被pop_heap,实际上仅仅消耗删除调整的代价,无需查找操作,时间复杂度是O(lgn)。 timer->timeout_callback_ = NULL; } heap_timer* top() const { //获得堆顶定时器 return empty() ? NULL : array_[0]; } //删除堆顶定时器 void pop_timer() { if(empty()) //有可能目前时空的,什么也不做 return ; if(array_[0] != NULL){ printf("delete user: %d/n", array_[0]->user_data_->sockfd_); delete array_[0]; //将原来的堆顶元素替换为堆数组中最后一个元素,提高效率 array_[0] = array_[--cur_size_]; printf("cur_size_: %d/n", cur_size_); sift_down(0); //对新的堆顶元素执行下滤操作 } } //心搏函数 void tick() { heap_timer* tmp = array_[0]; time_t cur = time(NULL); //循环处理堆中到期的定时器 while(!empty()){ if(tmp == NULL) break; printf("current time is: %d/n", cur); if(tmp->expire_ > cur) //如果堆顶定时器没到期,则退出循环 break; if(array_[0]->timeout_callback_ != NULL) //否则执行没有被延迟删除的定时器的任务 array_[0]->timeout_callback_(array_[0]->user_data_); //将堆顶元素删除,同时生成新的堆顶定时器(array[0]) pop_timer(); tmp = array_[0]; } } bool empty() const { return cur_size_ == 0; }private: void sift_down(int hole) { //下滤操作 auto left_child = [] (int i) { return 2*i + 1; }; heap_timer* tmp = array_[hole]; int child = -1; //注意比较的是left_child(hole)而不是hole for(; left_child(hole)<cur_size_; hole=child){ child = left_child(hole); if(child < cur_size_-1 && array_[child]->expire_ > array_[child+1]->expire_) ++child; if(array_[child]->expire_ < tmp->expire_) array_[hole] = array_[child]; else break; } array_[hole] = tmp; } //扩容函数 void resize() throw (std::exception) { heap_timer** tmp = new heap_timer* [capacity_<<1]; memset(tmp, 0, sizeof(tmp)); capacity_ = capacity_ << 1; for(int i=0; i<cur_size_; ++i) tmp[i] = array_[i]; delete []array_; array_ = tmp; }private: heap_timer** array_; //堆数组 int capacity_; //堆数组的容量 int cur_size_; //堆数组当前包含的元素的个数};#endif

测试代码:

#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <signal.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <pthread.h>#include <memory>#include <vector>#include "min_heap.h"const int FD_LIMIT = 65535;const int MAX_EVENT_NUMBER = 1024;const int TIME_SLOT = 5;const int INIT_HEAP_SIZE = 2;static int pipefd[2];static int epollfd = 0;static std::shared_ptr<time_heap> timer_lst;int setnonblocking( int fd ){ int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option;}void addfd(int fd ){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd );}void sig_handler( int sig ){ int save_errno = errno; int msg = sig; send( pipefd[1], ( char* )&msg, 1, 0 ); errno = save_errno;}void addsig( int sig ){ struct sigaction sa; memset( &sa, '/0', sizeof( sa ) ); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset( &sa.sa_mask ); assert( sigaction( sig, &sa, NULL ) != -1 );}void timer_handler(){ timer_lst->tick(); alarm( TIME_SLOT );}void cb_func( client_data* user_data ){ epoll_ctl( epollfd, EPOLL_CTL_DEL, user_data->sockfd_, 0 ); assert( user_data ); close( user_data->sockfd_ ); printf( "close fd %d/n", user_data->sockfd_ );}int main( int argc, char* argv[] ){ if( argc <= 2 ) { printf( "usage: %s ip_address port_number/n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); int on = 1; ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); assert(ret != -1); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd(listenfd ); ret = socketpair( PF_UNIX, SOCK_STREAM, 0, pipefd ); assert( ret != -1 ); setnonblocking( pipefd[1] ); addfd(pipefd[0] ); // add all the interesting signals here addsig( SIGALRM ); addsig( SIGTERM ); bool stop_server = false; std::vector<client_data> users(FD_LIMIT); bool timeout = false; alarm( TIME_SLOT ); ////////////////////////////////////////////////////// timer_lst.reset(new time_heap(INIT_HEAP_SIZE)); ////////////////////////////////////////////////////// while( !stop_server ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf( "epoll failure/n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd(connfd); users[connfd].addr_ = client_address; users[connfd].sockfd_ = connfd; printf("user: %d/n", connfd); heap_timer *timer = new heap_timer(3 * TIME_SLOT); timer_lst->add_timer(timer); timer->user_data_ = &users[connfd]; timer->timeout_callback_ = cb_func; users[connfd].timer_ = timer; } else if( ( sockfd == pipefd[0] ) && ( events[i].events & EPOLLIN ) ) { int sig; char signals[1024]; ret = recv( pipefd[0], signals, sizeof( signals ), 0 ); if( ret == -1 ) { // handle the error continue; } else if( ret == 0 ) { continue; } else { for( int i = 0; i < ret; ++i ) { switch( signals[i] ) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; } } } } } else if( events[i].events & EPOLLIN ) { memset( users[sockfd].buf_, '/0', BUFFER_SIZE ); ret = recv( sockfd, users[sockfd].buf_, BUFFER_SIZE-1, 0 ); printf( "get %d bytes of client data %s from %d/n", ret, users[sockfd].buf_, sockfd ); heap_timer* timer = users[sockfd].timer_; if( ret < 0 ) { if( errno != EAGAIN ) { cb_func( &users[sockfd] ); if( timer ) { timer_lst->del_timer( timer ); } } } else if( ret == 0 ) { cb_func( &users[sockfd] ); if( timer ) { timer_lst->del_timer( timer ); } } else { //send( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 ); if( timer ) { printf("user: %d/n", sockfd); heap_timer* new_timer = new heap_timer(3*TIME_SLOT); timer_lst->adjust_timer(timer, new_timer); new_timer->user_data_ = &users[sockfd]; new_timer->timeout_callback_ = cb_func; users[sockfd].timer_ = new_timer; } } } else { // others } } if( timeout ) { timer_handler(); timeout = false; } } close( listenfd ); close( epollfd ); close( pipefd[1] ); close( pipefd[0] ); return 0;}

对时间堆而言,添加一个定时器的时间复杂度是O(lgn),删除一个定时器的时间复杂度是O(1)(采用了延迟删除),执行一个定时器的时间复杂度是O(1)。因此,时间堆的效率是很高的。


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表