[Python]-15-分布式进程

引言 这篇文章实例演示如何通过multiprocessing的子模块managers将进程中的Queue任务信息通过网络传递给其它机器,实现分布式计算。

引言

这篇文章实例演示如何通过multiprocessing的子模块managers将进程中的Queue任务信息通过网络传递给其它机器,实现分布式计算。

文章目录

0×1.分布式进程实例

multiprocessing模块的managers子模块支持把多进程分布到多台机器上,一台机器上的服务进程可以作为调度者,依靠网络将任务传送到不同机器的多个进程中执行。

创建了两个py文件,调度文件名为server.py,运行在局域网A(192.168.1.233)机器上,接受调度的文件名为client.py,运行在局域网B机器上,A与B在同一网段能够互相通信,在调度文件中随机生成5个数字,通过网络将这5个数字传送给B,在B中实现平方计算,再将计算结果传输给A,请看下面的实例:

server.py文件内容如下:

#!/usr/bin/env python3

#coding=utf-8

#导入managers子模块中的BaseManager类,这个类封装了一些常用的网络传输和接口方法

from multiprocessing.managers import BaseManager

import queue,time,random

#初始化两个queue消息列队,一个用于传输,一个用于接收

sendMsg=queue.Queue()

receiveMsg=queue.Queue()

#新建一个类,继承BaseManager的所有方法和属性

class commBase(BaseManager):

pass

#使用BaseManager类的register方法,生成两个接口函数,名为"send_msg"和"receive_msg",使用callable参数,将这两个接口函数关联到不同的queue对象上(相当于定义了两个返回queue的网络接口函数)

commBase.register("send_msg",callable=lambda :sendMsg)

commBase.register("receive_msg",callable=lambda :receiveMsg)

#监听本地的20086端口,验证码为"www.qingsword.com"(在接收端需要配置相同的验证码才能连接到这台机器的20086端口)

commMgr=commBase(address=("",20086), authkey=b"www.qingsword.com")

#启动网络监听(此时会在系统中发现127.0.0.1:20086端口处于监听状态)

commMgr.start()

#获得上面创建的接口函数对象(queue对象)

send=commMgr.send_msg()

receive=commMgr.receive_msg()

#随机生成5个1~1000以内的数字,将它们放到进程中的queue网络接口消息列队中

for x in range(5):

n=random.randint(1,1000)

print("将整数'%s'放入待发送的消息列队..."%n)

send.put(n)

#之后这个程序将被阻塞,在20086端口上等待消息的返回

print("等待计算结果返回...")

for x in range(5):

r=receive.get(True)

print(r)

#关闭接口,释放资源

commMgr.shutdown()

print("End")

client.py文件内容如下:

#!/usr/bin/env python3

#coding=utf-8

from multiprocessing.managers import BaseManager

class commBase(BaseManager):

pass

#接收端只需要注册两个与调度端相同的网络接口函数名称即可

commBase.register("send_msg")

commBase.register("receive_msg")

#调度端IP(如果想使用同一台计算机测试,可以改成127.0.0.0,这样就是自己发自己接)

ip_address="192.168.1.233"

#调度端IP,端口,验证码

commMgr=commBase(address=(ip_address,20086),authkey=b"www.qingsword.com")

try:

#连接

commMgr.connect()

#实例化接口函数

send=commMgr.send_msg()

receive=commMgr.receive_msg()

#从调度端send_msg()接口get消息,然后将结果返回给调度端的receive_msg() 接口

for x in range(5):

print("开始从'%s'读取消息..."%ip_address)

n=send.get(True)

print("开始计算:%d*%d"%(n,n))

r="%d*%d=%d"%(n,n,n*n)

receive.put(r)

except Exception:

print("连接失败。")

else:

print("计算完成...")

#先在A上运行server.py,输出如下

将整数'363'放入待发送的消息列队...

将整数'36'放入待发送的消息列队...

将整数'700'放入待发送的消息列队...

将整数'242'放入待发送的消息列队...

将整数'754'放入待发送的消息列队...

等待计算结果返回...

#然后在B机器上运行client.py,输出结果如下

开始从'192.168.1.233'读取消息...

开始计算:363*363

开始从'192.168.1.233'读取消息...

开始计算:36*36

开始从'192.168.1.233'读取消息...

开始计算:700*700

开始从'192.168.1.233'读取消息...

开始计算:242*242

开始从'192.168.1.233'读取消息...

开始计算:754*754

计算完成...

#之后A机器上将看到下面的信息

363*363=131769

36*36=1296

700*700=490000

242*242=58564

754*754=568516

End

上面这就是一个简单完整的分布式计算模型,稍加改进就能完成很多其他的分布式作业。

Ps:如果在执行过程中遇到错误,请检查本地端口是否被占用,两端机器网络连接是否正常,两端验证码是否相同。

未登录用户
全部评论0
到底啦