首页 > 编程 > Python > 正文

python3学习笔记之多进程分布式小例子

2020-01-04 15:50:49
字体:
来源:转载
供稿:网友

最近一直跟着廖大在学Python,关于分布式进程的小例子挺有趣的,这里做个记录。

分布式进程

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

master服务端原理:通过managers模块把Queue通过网络暴露出去,其他机器的进程就可以访问Queue了
服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务,代码如下:

#task_master.py#coding=utf-8#多进程分布式例子#服务器端from multiprocessing.managers import BaseManagerfrom multiprocessing import freeze_support #server启动报错,提示需要引用此包import random,time,queue#发送任务的队列task_queue = queue.Queue()#接收结果的队列result_queue = queue.Queue()#从BaseManager继承的QueueManagerclass QueueManager(BaseManager):  pass#win7 64 貌似不支持callable下调用匿名函数lambda,这里封装一下def return_task_queue():  global task_queue  return task_queuedef return_result_queue():  global result_queue  return result_queuedef test():  #把两个Queue注册到网络上,callable参数关联了Queue对象  #QueueManager.register('get_task_queue',callable=lambda:task_queue)  #QueueManager.register('get_result_queue',callable=lambda:result_queue)  QueueManager.register('get_task_queue',callable=return_task_queue)  QueueManager.register('get_result_queue',callable=return_result_queue)  #绑定端口5000,设置验证码‘abc'  manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')#这里必须加上本地默认ip地址127.0.0.1  #启动Queue  manager.start()  #server = manager.get_server()  #server.serve_forever()  print('start server master')  #获得通过网络访问的Queue对象  task = manager.get_task_queue()  result = manager.get_result_queue()  #放几个任务进去  for i in range(10):    n = random.randint(0,10000)    print('put task %d...' % n)    task.put(n)  #从result队列读取结果  print('try get results...')  for i in range(10):    r = result.get(timeout=10)    print('result:%s' % r)  #关闭  manager.shutdown()  print('master exit')if __name__ == '__main__':  freeze_support()  test()

运行截图如下:

python3,多进程,分布式

在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

任务进程,代码如下:

#task_worker.py#coding=utf-8#多进程分布式例子#非服务端:workerimport time,sys,queuefrom multiprocessing.managers import BaseManager#创建类似的QueueManagerclass QueueManager(BaseManager):  pass#由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字即可QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')#连接到服务器,也就是运行task_master.py的机器server_addr = '127.0.0.1'print('connect to server %s...'% server_addr)#端口和验证码注意要保持完全一致m = QueueManager(address=(server_addr,5000),authkey=b'abc')#从网络连接m.connect()#获取Queue的对象task = m.get_task_queue()result = m.get_result_queue()#从task队列获取任务,并把结果写入result队列for i in range(10):  try:    n = task.get(timeout=1)    print('run task %d * %d...'% (n,n))    r = '%d * %d = %d' % (n,n,n*n)    time.sleep(1)    result.put(r)  except queue.Empty:    print('task queue is empty')#处理结果print('worker exit')

 运行截图如下:

python3,多进程,分布式

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VEVB武林网。


注:相关教程知识阅读请移步到python教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表