首页 > 编程 > C > 正文

C语言实现支持动态拓展和销毁的线程池

2020-01-26 14:46:44
字体:
来源:转载
供稿:网友

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下

实现功能

  • 1.初始化指定个数的线程
  • 2.使用链表来管理任务队列
  • 3.支持拓展动态线程
  • 4.如果闲置线程过多,动态销毁部分线程
#include <stdio.h>#include <pthread.h>#include <stdlib.h>#include <signal.h> /*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/typedef struct thread_worker_s{  void *(*process)(void *arg); //处理函数  void *arg;          //参数  struct thread_worker_s *next;}thread_worker_t; #define bool int#define true 1#define false 0 /*线程池中各线程状态描述*/#define THREAD_STATE_RUN        0#define THREAD_STATE_TASK_WAITING   1#define THREAD_STATE_TASK_PROCESSING  2#define THREAD_STATE_TASK_FINISHED   3#define THREAD_STATE_EXIT       4     typedef struct thread_info_s{  pthread_t id;  int    state;   struct thread_info_s *next;}thread_info_t; static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*/  /*线程池管理器*/#define THREAD_BUSY_PERCENT 0.5  /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/#define THREAD_IDLE_PERCENT 2   /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/ typedef struct thread_pool_s{  pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁  pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了   thread_worker_t *head   ;    //任务队列头指针  bool    is_destroy   ;    //线程池是否已经销毁  int num;              //线程的个数  int rnum;         ;    //正在跑的线程  int knum;         ;    //已杀死的线程  int queue_size       ;    //工作队列的大小   thread_info_t *threads   ;    //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程  pthread_t   display   ;    //打印线程  pthread_t   destroy   ;    //定期销毁线程的线程id  pthread_t   extend    ;  float percent       ;    //线程个数于任务的比例 rnum/queue_size  int  init_num      ;  pthread_cond_t  extend_ready     ;    //如果要增加线程}thread_pool_t; /*-------------------------函数声明----------------------*//** * 1.初始化互斥变量 * 2.初始化等待变量 * 3.创建指定个数的线程线程 */thread_pool_t* thread_pool_create(int num);void *thread_excute_route(void *arg);  /*调试函数*/void debug(char *message,int flag){  if(flag)    printf("%s/n",message);} void *display_thread(void *arg);/** * 添加任务包括以下几个操作 * 1.将任务添加到队列末尾 * 2.通知等待进程来处理这个任务 pthread_cond_singal();*/int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务 /** * 销毁线程池,包括以下几个部分 * 1.通知所有等待的进程 pthread_cond_broadcase * 2.等待所有的线程执行完 * 3.销毁任务列表 * 4.释放锁,释放条件 * 4.销毁线程池对象 */   void *thread_pool_is_need_recovery(void *arg);void *thread_pool_is_need_extend(void *arg);void thread_pool_destory(thread_pool_t *pool);  thread_pool_t *thread_pool_create(int num){  if(num<1){    return NULL;  }  thread_pool_t *p;  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));  if(p==NULL)    return NULL;  p->init_num = num;  /*初始化互斥变量与条件变量*/  pthread_mutex_init(&(p->queue_lock),NULL);  pthread_cond_init(&(p->queue_ready),NULL);   /*设置线程个数*/  p->num  = num;   p->rnum = num;  p->knum = 0;   p->head = NULL;  p->queue_size =0;   p->is_destroy = false;      int i=0;  thread_info_t *tmp=NULL;  for(i=0;i<num;i++){    /*创建线程*/    tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));    if(tmp==NULL){      free(p);      return NULL;    }else{      tmp->next = p->threads;      p->threads = tmp;    }    pthread_create(&(tmp->id),NULL,thread_excute_route,p);    tmp->state = THREAD_STATE_RUN;  }   /*显示*/  pthread_create(&(p->display),NULL,display_thread,p);  /*检测是否需要动态线程*/  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);  /*动态销毁*/  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);  return p;} int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){  thread_pool_t *p= pool;  thread_worker_t *worker=NULL,*member=NULL;  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));  int incr=0;  if(worker==NULL){    return -1;  }  worker->process = process;  worker->arg   = arg;  worker->next  = NULL;  thread_pool_is_need_extend(pool);  pthread_mutex_lock(&(p->queue_lock));  member = p->head;  if(member!=NULL){    while(member->next!=NULL)      member = member->next;    member->next = worker;  }else{    p->head = worker;  }  p->queue_size ++;  pthread_mutex_unlock(&(p->queue_lock));  pthread_cond_signal(&(p->queue_ready));  return 1;}  void thread_pool_wait(thread_pool_t *pool){  thread_info_t *thread;  int i=0;  for(i=0;i<pool->num;i++){    thread = (thread_info_t*)(pool->threads+i);    thread->state = THREAD_STATE_EXIT;    pthread_join(thread->id,NULL);  }}void thread_pool_destory(thread_pool_t *pool){  thread_pool_t  *p   = pool;  thread_worker_t *member = NULL;   if(p->is_destroy)    return ;  p->is_destroy = true;  pthread_cond_broadcast(&(p->queue_ready));  thread_pool_wait(pool);  free(p->threads);  p->threads = NULL;  /*销毁任务列表*/  while(p->head){    member = p->head;    p->head = member->next;    free(member);  }  /*销毁线程列表*/  thread_info_t *tmp=NULL;  while(p->threads){    tmp = p->threads;    p->threads = tmp->next;    free(tmp);  }   pthread_mutex_destroy(&(p->queue_lock));  pthread_cond_destroy(&(p->queue_ready));  return ;}/*通过线程id,找到对应的线程*/thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){  thread_info_t *thread=NULL;  thread_info_t *p=pool->threads;  while(p!=NULL){    if(p->id==id)      return p;    p = p->next;  }  return NULL;}  /*每个线程入口函数*/void *thread_excute_route(void *arg){  thread_worker_t *worker = NULL;  thread_info_t  *thread = NULL;   thread_pool_t*  p = (thread_pool_t*)arg;  //printf("thread %lld create success/n",pthread_self());  while(1){    pthread_mutex_lock(&(p->queue_lock));     /*获取当前线程的id*/    pthread_t pthread_id = pthread_self();    /*设置当前状态*/    thread = get_thread_by_id(p,pthread_id);     /*线程池被销毁,并且没有任务了*/    if(p->is_destroy==true && p->queue_size ==0){      pthread_mutex_unlock(&(p->queue_lock));      thread->state = THREAD_STATE_EXIT;      p->knum ++;      p->rnum --;      pthread_exit(NULL);    }    if(thread){      thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/    }    /*线程池没有被销毁,没有任务到来就一直等待*/    while(p->queue_size==0 && !p->is_destroy){      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));    }    p->queue_size--;    worker = p->head;    p->head = worker->next;    pthread_mutex_unlock(&(p->queue_lock));          if(thread)      thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/    (*(worker->process))(worker->arg);    if(thread)      thread->state = THREAD_STATE_TASK_FINISHED;  /*任务执行完成*/    free(worker);    worker = NULL;  }}   /*拓展线程*/void *thread_pool_is_need_extend(void *arg){  thread_pool_t *p = (thread_pool_t *)arg;  thread_pool_t *pool = p;  /*判断是否需要增加线程,最终目的 线程:任务=1:2*/  if(p->queue_size>100){    int incr =0;    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/      int i=0;      thread_info_t *tmp=NULL;      thread_pool_t *p = pool;      pthread_mutex_lock(&pool->queue_lock);      if(p->queue_size<100){        pthread_mutex_unlock(&pool->queue_lock);        return ;      }      for(i=0;i<incr;i++){        /*创建线程*/        tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s));        if(tmp==NULL){          continue;        }else{          tmp->next = p->threads;          p->threads = tmp;        }        p->num ++;        p->rnum ++;        pthread_create(&(tmp->id),NULL,thread_excute_route,p);        tmp->state = THREAD_STATE_RUN;      }      pthread_mutex_unlock(&pool->queue_lock);    }  }  //pthread_cond_signal(&pool->extend_ready);}pthread_cond_t sum_ready;/*恢复初始线程个数*/void *thread_pool_is_need_recovery(void *arg){  thread_pool_t *pool = (thread_pool_t *)arg;  int i=0;  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;  /*如果没有任务了,当前线程大于初始化的线程个数*/  while(1){    i=0;    if(pool->queue_size==0 && pool->rnum > pool->init_num ){      sleep(5);      /*5s秒内还是这个状态的话就,销毁部分线程*/      if(pool->queue_size==0 && pool->rnum > pool->init_num ){        pthread_mutex_lock(&pool->queue_lock);        tmp = pool->threads;        while((pool->rnum != pool->init_num) && tmp){          /*找到空闲的线程*/          if(tmp->state != THREAD_STATE_TASK_PROCESSING){            i++;            if(prev)              prev->next  = tmp->next;            else              pool->threads = tmp->next;            pool->rnum --; /*正在运行的线程减一*/            pool->knum ++; /*销毁的线程加一*/            kill(tmp->id,SIGKILL); /*销毁线程*/            p1 = tmp;            tmp = tmp->next;            free(p1);            continue;          }          prev = tmp;          tmp = tmp->next;        }        pthread_mutex_unlock(&pool->queue_lock);        printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程/n",i);      }    }    sleep(5);  }}   /*打印一些信息的*/void *display_thread(void *arg){  thread_pool_t *p =(thread_pool_t *)arg;  thread_info_t *thread = NULL;  int i=0;  while(1){    printf("threads %d,running %d,killed %d/n",p->num,p->rnum,p->knum);  /*线程总数,正在跑的,已销毁的*/    thread = p->threads;    while(thread){      printf("id=%ld,state=%s/n",thread->id,thread_state_map[thread->state]);      thread = thread->next;    }    sleep(5);  }}

希望本文所述对大家学习C语言程序设计有所帮助。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表

图片精选