本文为大家分享了threadpool线程池中所有的操作,供大家参考,具体内容如下
首先介绍一下自己使用到的名词:
工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;
任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 makeRequests来创建
任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;
任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体 的 处理任务,并返回处理结果;
任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);
任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;
任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;
上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:
(1)线程池的创建
(2)工作线程的启动
(3)任务的创建
(4)任务的推送到线程池
(5)线程处理任务
(6)任务结束处理
(7)工作线程的退出
下面是threadpool的定义:
class ThreadPool: """A thread pool, distributing work requests and collecting results. See the module docstring for more information. """ def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): pass def createWorkers(self, num_workers, poll_timeout=5): pass def dismissWorkers(self, num_workers, do_join=False): pass def joinAllDismissedWorkers(self): pass def putRequest(self, request, block=True, timeout=None): pass def poll(self, block=False): pass def wait(self): pass
1、线程池的创建(ThreadPool(args))
task_pool=threadpool.ThreadPool(num_works)
task_pool=threadpool.ThreadPool(num_works) def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): """Set up the thread pool and start num_workers worker threads. ``num_workers`` is the number of worker threads to start initially. If ``q_size > 0`` the size of the work *request queue* is limited and the thread pool blocks when the queue is full and it tries to put more work requests in it (see ``putRequest`` method), unless you also use a positive ``timeout`` value for ``putRequest``. If ``resq_size > 0`` the size of the *results queue* is limited and the worker threads will block when the queue is full and they try to put new results in it. .. warning: If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is the possibilty of a deadlock, when the results queue is not pulled regularly and too many jobs are put in the work requests queue. To prevent this, always set ``timeout > 0`` when calling ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. """ self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中 self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span> self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中 self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程 self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span> self.createWorkers(num_workers, poll_timeout)
新闻热点
疑难解答