剑客
关注科技互联网

[Python]-15-分布式进程

引言

这篇文章实例演示如何通过multiprocessing的子模块managers将进程中的Queue任务信息通过网络传递给其它机器,实现分布式计算。

文章目录

0×1.分布式进程实例

multiprocessing模块的managers子模块支持把多进程分布到多台机器上,一台机器上的服务进程可以作为调度者,依靠网络将任务传送到不同机器的多个进程中执行。

创建了两个py文件,调度文件名为server.py,运行在局域网A(192.168.1.233)机器上,接受调度的文件名为client.py,运行在局域网B机器上,A与B在同一网段能够互相通信,在调度文件中随机生成5个数字,通过网络将这5个数字传送给B,在B中实现平方计算,再将计算结果传输给A,请看下面的实例:

server.py文件内容如下:

#!/usr/bin/env python3
					#coding=utf-8

					#导入managers子模块中的BaseManager类,这个类封装了一些常用的网络传输和接口方法
					from multiprocessing.managers import BaseManager
					import queue,time,random

					#初始化两个queue消息列队,一个用于传输,一个用于接收
					sendMsg=queue.Queue()
					receiveMsg=queue.Queue()

					#新建一个类,继承BaseManager的所有方法和属性
					class commBase(BaseManager):
					    pass

					#使用BaseManager类的register方法,生成两个接口函数,名为"send_msg"和"receive_msg",使用callable参数,将这两个接口函数关联到不同的queue对象上(相当于定义了两个返回queue的网络接口函数)
					commBase.register("send_msg",callable=lambda :sendMsg)
					commBase.register("receive_msg",callable=lambda :receiveMsg)

					#监听本地的20086端口,验证码为"www.qingsword.com"(在接收端需要配置相同的验证码才能连接到这台机器的20086端口)
					commMgr=commBase(address=("",20086), authkey=b"www.qingsword.com")

					#启动网络监听(此时会在系统中发现127.0.0.1:20086端口处于监听状态)
					commMgr.start()
					
					#获得上面创建的接口函数对象(queue对象)
					send=commMgr.send_msg()
					receive=commMgr.receive_msg()

					#随机生成5个1~1000以内的数字,将它们放到进程中的queue网络接口消息列队中
					for x in range(5):
					    n=random.randint(1,1000)
					    print("将整数'%s'放入待发送的消息列队..."%n)
					    send.put(n)

					#之后这个程序将被阻塞,在20086端口上等待消息的返回
					print("等待计算结果返回...")
					for x in range(5):
					    r=receive.get(True)
					    print(r)
					
					#关闭接口,释放资源
					commMgr.shutdown()
					print("End")

client.py文件内容如下:

#!/usr/bin/env python3
					#coding=utf-8
					from multiprocessing.managers import BaseManager

					class commBase(BaseManager):
					    pass
					
					#接收端只需要注册两个与调度端相同的网络接口函数名称即可
					commBase.register("send_msg")
					commBase.register("receive_msg")
					
					#调度端IP(如果想使用同一台计算机测试,可以改成127.0.0.0,这样就是自己发自己接)
					ip_address="192.168.1.233"

					#调度端IP,端口,验证码
					commMgr=commBase(address=(ip_address,20086),authkey=b"www.qingsword.com")

					try:
					    #连接
					    commMgr.connect()

					    #实例化接口函数
					    send=commMgr.send_msg()
					    receive=commMgr.receive_msg() 

					    #从调度端send_msg()接口get消息,然后将结果返回给调度端的receive_msg() 接口
					    for x in range(5):
					        print("开始从'%s'读取消息..."%ip_address)
					        n=send.get(True)
					        print("开始计算:%d*%d"%(n,n))
					        r="%d*%d=%d"%(n,n,n*n)
					        receive.put(r)
					except Exception:
					    print("连接失败。")
					else:
					    print("计算完成...")
#先在A上运行server.py,输出如下
					将整数'363'放入待发送的消息列队...
					将整数'36'放入待发送的消息列队...
					将整数'700'放入待发送的消息列队...
					将整数'242'放入待发送的消息列队...
					将整数'754'放入待发送的消息列队...
					等待计算结果返回...

					#然后在B机器上运行client.py,输出结果如下
					开始从'192.168.1.233'读取消息...
					开始计算:363*363
					开始从'192.168.1.233'读取消息...
					开始计算:36*36
					开始从'192.168.1.233'读取消息...
					开始计算:700*700
					开始从'192.168.1.233'读取消息...
					开始计算:242*242
					开始从'192.168.1.233'读取消息...
					开始计算:754*754
					计算完成...

					#之后A机器上将看到下面的信息
					363*363=131769
					36*36=1296
					700*700=490000
					242*242=58564
					754*754=568516
					End

上面这就是一个简单完整的分布式计算模型,稍加改进就能完成很多其他的分布式作业。

Ps:如果在执行过程中遇到错误,请检查本地端口是否被占用,两端机器网络连接是否正常,两端验证码是否相同。

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址