[Python]-15-分布式进程

引言 这篇文章实例演示如何通过multiprocessing的子模块managers将进程中的Queue任务信息通过网络传递给其它机器,实现分布式计算。

引言

这篇文章实例演示如何通过multiprocessing的子模块managers将进程中的Queue任务信息通过网络传递给其它机器,实现分布式计算。

文章目录

0×1.分布式进程实例

multiprocessing模块的managers子模块支持把多进程分布到多台机器上,一台机器上的服务进程可以作为调度者,依靠网络将任务传送到不同机器的多个进程中执行。

创建了两个py文件,调度文件名为server.py,运行在局域网A(192.168.1.233)机器上,接受调度的文件名为client.py,运行在局域网B机器上,A与B在同一网段能够互相通信,在调度文件中随机生成5个数字,通过网络将这5个数字传送给B,在B中实现平方计算,再将计算结果传输给A,请看下面的实例:

server.py文件内容如下:

#!/usr/bin/env python3 #coding=utf-8 #导入managers子模块中的BaseManager类,这个类封装了一些常用的网络传输和接口方法 from multiprocessing.managers import BaseManager import queue,time,random #初始化两个queue消息列队,一个用于传输,一个用于接收 sendMsg=queue.Queue() receiveMsg=queue.Queue() #新建一个类,继承BaseManager的所有方法和属性 class commBase(BaseManager): pass #使用BaseManager类的register方法,生成两个接口函数,名为"send_msg"和"receive_msg",使用callable参数,将这两个接口函数关联到不同的queue对象上(相当于定义了两个返回queue的网络接口函数) commBase.register("send_msg",callable=lambda :sendMsg) commBase.register("receive_msg",callable=lambda :receiveMsg) #监听本地的20086端口,验证码为"www.qingsword.com"(在接收端需要配置相同的验证码才能连接到这台机器的20086端口) commMgr=commBase(address=("",20086), authkey=b"www.qingsword.com") #启动网络监听(此时会在系统中发现127.0.0.1:20086端口处于监听状态) commMgr.start() #获得上面创建的接口函数对象(queue对象) send=commMgr.send_msg() receive=commMgr.receive_msg() #随机生成5个1~1000以内的数字,将它们放到进程中的queue网络接口消息列队中 for x in range(5): n=random.randint(1,1000) print("将整数'%s'放入待发送的消息列队..."%n) send.put(n) #之后这个程序将被阻塞,在20086端口上等待消息的返回 print("等待计算结果返回...") for x in range(5): r=receive.get(True) print(r) #关闭接口,释放资源 commMgr.shutdown() print("End")

client.py文件内容如下:

#!/usr/bin/env python3 #coding=utf-8 from multiprocessing.managers import BaseManager class commBase(BaseManager): pass #接收端只需要注册两个与调度端相同的网络接口函数名称即可 commBase.register("send_msg") commBase.register("receive_msg") #调度端IP(如果想使用同一台计算机测试,可以改成127.0.0.0,这样就是自己发自己接) ip_address="192.168.1.233" #调度端IP,端口,验证码 commMgr=commBase(address=(ip_address,20086),authkey=b"www.qingsword.com") try: #连接 commMgr.connect() #实例化接口函数 send=commMgr.send_msg() receive=commMgr.receive_msg() #从调度端send_msg()接口get消息,然后将结果返回给调度端的receive_msg() 接口 for x in range(5): print("开始从'%s'读取消息..."%ip_address) n=send.get(True) print("开始计算:%d*%d"%(n,n)) r="%d*%d=%d"%(n,n,n*n) receive.put(r) except Exception: print("连接失败。") else: print("计算完成...")
#先在A上运行server.py,输出如下 将整数'363'放入待发送的消息列队... 将整数'36'放入待发送的消息列队... 将整数'700'放入待发送的消息列队... 将整数'242'放入待发送的消息列队... 将整数'754'放入待发送的消息列队... 等待计算结果返回... #然后在B机器上运行client.py,输出结果如下 开始从'192.168.1.233'读取消息... 开始计算:363*363 开始从'192.168.1.233'读取消息... 开始计算:36*36 开始从'192.168.1.233'读取消息... 开始计算:700*700 开始从'192.168.1.233'读取消息... 开始计算:242*242 开始从'192.168.1.233'读取消息... 开始计算:754*754 计算完成... #之后A机器上将看到下面的信息 363*363=131769 36*36=1296 700*700=490000 242*242=58564 754*754=568516 End

上面这就是一个简单完整的分布式计算模型,稍加改进就能完成很多其他的分布式作业。

Ps:如果在执行过程中遇到错误,请检查本地端口是否被占用,两端机器网络连接是否正常,两端验证码是否相同。

未登录用户
全部评论0
到底啦