Python分布式进程使用(Queue和BaseManager使用)

Python分布式进程使⽤(Queue和BaseManager使⽤)分布式进程
需要模块
multiprocessing和queue模块
使⽤BaseManager创建分布式管理器
使⽤Queue创建队列,⽤于多个进程之间的通信
分布式进程原理
managers⼦模块⽀持把多个进程分布到多台机器上
可以写⼀个服务进程作为调度者,将任务分布到其它多个进程中,然后通过⽹络通信进⾏管理
⽐如爬取图⽚:⼀般⼀个进程负责抓取图⽚的地址,将地址放在Queue(容器)队列中
另外⼀个进程负责从Queue队列中取出链接地址进⾏图⽚下载和存储到到本地
上述爬取图⽚的过程就可以做成分布式,⼀台机器负责获取链接,另外⼀台机器负责下载存储
上述问题核⼼:将Queue队列暴露到⽹络中,让其他机器可以访问
分布式进程实现步骤
建⽴Queue队列,负责进程之间的通信,任务队列task_queue,结果队列result_queue
把第⼀步中的两个队列在⽹络中注册,注册时候将队列重新命名
创建⼀个Queuemanager(BaseManager)的实例manager,相当于⼀个服务器,给定IP地址、端⼝和验证码启动实例manager
访问Queue对象,即创建⽹络中暴露重命名后的Queue实例
创建任务到本地队列中,⾃动上传任务到⽹络队列中,分配给任务进程进⾏处理
任务进程先从⽹络中任务队列中取出任务,然后执⾏,将执⾏结果放⼊到⽹络中的结果队列中
服务进程从结果队列中取出结果,直到执⾏完所有任务和取出所有的结果,任务进程关闭,然后服务进⾏关闭
注意
先创建服务进程,再创建任务进程
执⾏时,先启动服务进程,在创建任务进程,启动任务进程不要超过服务进程取出结果的等待时间
分布式进程实例
创建⼀个分布式进程,⽤来完成10次乘法任务
服务进程:
# 服务进程在windows系统和Linux系统上有所不同
# 创建⼀个分布式进程:包括服务进程和任务进程
# 多个进程之间的通信使⽤Queue
# 该代码为服务进程
# 注意,运⾏时先运⾏服务进程,再运⾏任务进程
# 任务执⾏循序:
# 服务进程和任务进⾏都创建了相同的两个队列,⼀个⽤来放任务,⼀个⽤来放结果
# 第⼀步:服务进程运⾏,⽐如将数字2放进任务队列,任务进程从任务队列中取出数字2
# 第⼆步:取出数字,执⾏任务,就是2*2=4, 任务执⾏完后,放进结果队列
# 第三步:服务进程从结果队列中,取出结果。
# 第四步:所有任务执⾏完毕,所有结果都已经取出,最终任务队列和结果队列都是空的了
# -*- coding:utf-8 -*-
import random, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
简易车棚
# 第⼀步:定义两个Queue队列,⼀个⽤于发送任务,⼀个接收结果
task_queue = queue.Queue()
result_queue = queue.Queue()
# 创建类似的QueueManager,继承BaseManager,⽤于后⾯创建管理器
class QueueManager(BaseManager):
pass
# 定义两个函数,返回结果就是Queue队列
def return_task_queue():
global task_queue # 定义成全局变量
return task_queue # 返回发送任务的队列
def return_result_queue():
global result_queue
return result_queue # 返回接收结果的队列
# 第⼆步:把上⾯创建的两个队列注册在⽹络上,利⽤register⽅法
# callable参数关联了Queue对象,将Queue对象在⽹络中暴露
# 第⼀个参数是注册在⽹络上队列的名称
def test():
# 第三步:绑定端⼝8001,设置验证⼝令,这个相当于对象的初始化
# 绑定端⼝并填写验证⼝令,windows下需要填写IP地址,Linux下默认为本地,地址为空
manager = QueueManager(address=('127.0.0.1', 8001), authkey=b'abc') # ⼝令必须写成类似b'abc'形式,只写'abc'运⾏错误
# 第四步:启动管理器,启动Queue队列,监听信息通道
manager.start()
# 第五步:通过管理实例的⽅法获访问⽹络中的Queue对象
# 即通过⽹络访问获取任务队列和结果队列,创建了两个Queue实例,
task = _task_queue()
5b5b5b5b
result = _result_queue()
# 第六步:添加任务,获取返回的结果
# 将任务放到Queue队列中
for i in range(10):
n = random.randint(0, 10) # 返回0到10之间的随机数
print("Put task %s ..." % n)
task.put(n) # 将n放⼊到任务队列中
# 从结果队列中取出结果
print("Try ")
for i in range(11): # 注意,这⾥结果队列中取结果设置为11次,总共只有10个任务和10个结果,第10次⽤量确认队列中是不是已经空了        # 总共循环10次,上⾯放⼊了10个数字作为任务
# 加载⼀个异常捕获
try:
r = (timeout=5) # 每次等待5秒,取结果队列中的值
print("Result: %s" % r)
except queue.Empty:
print("result queue is empty.")
# 最后⼀定要关闭服务,不然会报管道未关闭的错误
manager.shutdown()
print("master exit.")
if __name__ == '__main__':
# Windows下多进程可能出现问题,添加以下代码可以缓解
freeze_support()
print("Start!")
# 运⾏服务进程
test()
任务进程
# coding: utf-8
# 定义具体的任务进程,具体的⼯作任务是什么
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager,继承BaseManager,⽤于后⾯创建管理器
class QueueManager(BaseManager):
pass
# 第⼀步:使⽤QueueManager注册⽤于获取Queue的⽅法名称
# 前⾯服务进程已经将队列名称暴露到⽹络中,
# 该任务进程注册时只需要提供名称即可,与服务进程中队列名称⼀致ister('get_task_queue')
# 第⼆步:连接到服务器,也就是运⾏服务进程代码的机器
server_addr = '127.0.0.1'
print("Connet to server %s..." % server_addr)
# 创建⼀个管理器实例,端⼝和验证⼝令保持与服务进程中完全⼀致
m = QueueManager(address=(server_addr, 8001), authkey=b'abc')
# 连接到⽹络服务器
# 第三步:从⽹络上获取Queue对象,并进⾏本地化,与服务进程是同⼀个队列task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:从task队列获取任务,并把结果写⼊到resul队列
for i in range(10):
try:
# 前⾯服务进程向task队列中放⼊了n,这⾥取出n
# n和n相乘,并将相乘的算式和结果放⼊到result队列中去
n = (timeout=1) # 每次等待1秒后取出任务
print("run task %d * %d..." % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print("task queue is empty.")
# 任务处理结束
print("worker exit.")
启动分布式进程
先运⾏服务进程,再运⾏任务进程
服务进程运⾏结果
Start!
Put task 2 ...
Put task 0 ...
Put task 0 ...
Put task 6 ...
Put task 6 ...
Put task 8 ...
Put task 8 ...
Put task 4 ...
Put task 1 ...
Put task 1 ...
Try
Result: 2 * 2 = 4
Result: 0 * 0 = 0
狭基线纹香茶菜Result: 0 * 0 = 0
Result: 6 * 6 = 36
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 8 * 8 = 64
Result: 4 * 4 = 16
Result: 1 * 1 = 1
Result: 1 * 1 = 1
result queue is empty.
master exit.
Process finished with exit code 0
任务进程运⾏结果
Connet to server 127.0.
run task 2 * 2...
精油加工设备run task 0 * 0...
run task 0 * 0...
run task 6 * 6...
run task 6 * 6...
run task 8 * 8...
run task 8 * 8...
run task 4 * 4...
run task 1 * 1...
run task 1 * 1...
worker exit.
Process finished with exit code 0
知识补充1
当我们在⼀台机器上写多进程程序时,创建的Queue可以直接拿来⽤,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进⾏操作,那样就绕过了QueueManager的封装,必须通过_task_queue()获得的Queue接⼝添加。然后,在另⼀台机器上启动任务进程(本机上启动也可以)
知识补充2
棉籽皮其中task_queue和result_queue是两个队列,分别存放任务和结果。它们⽤来进⾏进程间通信,交换对象。
因为是分布式的环境,放⼊queue中的数据需要等待Workers机器运算处理后再进⾏读取,
这样就需要对queue⽤QueueManager进⾏封装放到⽹络中,这是通过上⾯的2⾏代码来实现的。我们给return_task_queue的⽹络调⽤接⼝取了⼀个名get_task_queue,⽽return_result_queue的名字是get_result_queue,⽅便区分对哪个queue进⾏操作。task.put(n)即是对task_queue进⾏写⼊数据,相当于分配任务。⽽()即是等待workers机器处理后返回的结果。
知识补充3
金属探测器隐藏这个简单的Master/Worker模型有什么⽤?其实这就是⼀个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到⼏台甚⾄⼏⼗台机器上,⽐如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:
⽽Queue之所以能通过⽹络访问,就是通过QueueManager实现的。由于QueueManager管理的不⽌⼀个Queue,所以,要给每个Queue的⽹络调⽤接⼝起个名字,⽐如get_task_queue。task_worker这⾥的QueueManager注册的名字必须和task_manager中的⼀样。对⽐上⾯的例⼦,可以看出Queue对象从另⼀个进程通过⽹络传递了过来。只不过这⾥的传递和⽹络通信由QueueManager完成。
authkey有什么⽤?这是为了保证两台机器正常通信,不被其他机器恶意⼲扰。如果task_worker.py的authkey和task_master.py的authkey不⼀致,肯定连接不上。

本文发布于:2024-09-22 05:25:17,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/2/101662.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:进程   任务   队列
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议