python3學(xué)習(xí)筆記之多進(jìn)程分布式小例子
最近一直跟著廖大在學(xué)Python,關(guān)于分布式進(jìn)程的小例子挺有趣的,這里做個(gè)記錄。
分布式進(jìn)程
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。由于managers模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫分布式多進(jìn)程程序。
master服務(wù)端原理:通過managers模塊把Queue通過網(wǎng)絡(luò)暴露出去,其他機(jī)器的進(jìn)程就可以訪問Queue了
服務(wù)進(jìn)程負(fù)責(zé)啟動(dòng)Queue,把Queue注冊(cè)到網(wǎng)絡(luò)上,然后往Queue里面寫入任務(wù),代碼如下:
#task_master.py
#coding=utf-8
#多進(jìn)程分布式例子
#服務(wù)器端
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support #server啟動(dòng)報(bào)錯(cuò),提示需要引用此包
import random,time,queue
#發(fā)送任務(wù)的隊(duì)列
task_queue = queue.Queue()
#接收結(jié)果的隊(duì)列
result_queue = queue.Queue()
#從BaseManager繼承的QueueManager
class QueueManager(BaseManager):
pass
#win7 64 貌似不支持callable下調(diào)用匿名函數(shù)lambda,這里封裝一下
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
def test():
#把兩個(gè)Queue注冊(cè)到網(wǎng)絡(luò)上,callable參數(shù)關(guān)聯(lián)了Queue對(duì)象
#QueueManager.register('get_task_queue',callable=lambda:task_queue)
#QueueManager.register('get_result_queue',callable=lambda:result_queue)
QueueManager.register('get_task_queue',callable=return_task_queue)
QueueManager.register('get_result_queue',callable=return_result_queue)
#綁定端口5000,設(shè)置驗(yàn)證碼‘a(chǎn)bc'
manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')#這里必須加上本地默認(rèn)ip地址127.0.0.1
#啟動(dòng)Queue
manager.start()
#server = manager.get_server()
#server.serve_forever()
print('start server master')
#獲得通過網(wǎng)絡(luò)訪問的Queue對(duì)象
task = manager.get_task_queue()
result = manager.get_result_queue()
#放幾個(gè)任務(wù)進(jìn)去
for i in range(10):
n = random.randint(0,10000)
print('put task %d...' % n)
task.put(n)
#從result隊(duì)列讀取結(jié)果
print('try get results...')
for i in range(10):
r = result.get(timeout=10)
print('result:%s' % r)
#關(guān)閉
manager.shutdown()
print('master exit')
if __name__ == '__main__':
freeze_support()
test()
運(yùn)行截圖如下:

在分布式多進(jìn)程環(huán)境下,添加任務(wù)到Queue不可以直接對(duì)原始的task_queue進(jìn)行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
任務(wù)進(jìn)程,代碼如下:
#task_worker.py
#coding=utf-8
#多進(jìn)程分布式例子
#非服務(wù)端:worker
import time,sys,queue
from multiprocessing.managers import BaseManager
#創(chuàng)建類似的QueueManager
class QueueManager(BaseManager):
pass
#由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue,所以注冊(cè)時(shí)只提供名字即可
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器
server_addr = '127.0.0.1'
print('connect to server %s...'% server_addr)
#端口和驗(yàn)證碼注意要保持完全一致
m = QueueManager(address=(server_addr,5000),authkey=b'abc')
#從網(wǎng)絡(luò)連接
m.connect()
#獲取Queue的對(duì)象
task = m.get_task_queue()
result = m.get_result_queue()
#從task隊(duì)列獲取任務(wù),并把結(jié)果寫入result隊(duì)列
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...'% (n,n))
r = '%d * %d = %d' % (n,n,n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty')
#處理結(jié)果
print('worker exit')
運(yùn)行截圖如下:

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python3.6基于正則實(shí)現(xiàn)的計(jì)算器示例【無優(yōu)化簡(jiǎn)單注釋版】
這篇文章主要介紹了Python3.6基于正則實(shí)現(xiàn)的計(jì)算器,涉及Python基于正則表達(dá)式的算術(shù)式遍歷、查找及數(shù)學(xué)運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2018-06-06
獲取python運(yùn)行輸出的數(shù)據(jù)并解析存為dataFrame實(shí)例
這篇文章主要介紹了獲取python運(yùn)行輸出的數(shù)據(jù)并解析存為dataFrame實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-07-07
卷積神經(jīng)網(wǎng)絡(luò)如何實(shí)現(xiàn)提取特征
這篇文章主要介紹了卷積神經(jīng)網(wǎng)絡(luò)如何實(shí)現(xiàn)提取特征問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
Matplotlib繪圖基礎(chǔ)之文本標(biāo)注詳解
Matplotlib?文本和標(biāo)注可以為數(shù)據(jù)和圖形之間提供額外的信息,幫助觀察者更好地理解數(shù)據(jù)和圖形的含義,下面就將通過示例依次介紹文本和標(biāo)注的常用使用方式2023-08-08
Python入門及進(jìn)階筆記 Python 內(nèi)置函數(shù)小結(jié)
這篇文章主要介紹了Python的內(nèi)置函數(shù)小結(jié),需要的朋友可以參考下2014-08-08
Python實(shí)現(xiàn)遍歷目錄的方法【測(cè)試可用】
這篇文章主要介紹了Python實(shí)現(xiàn)遍歷目錄的方法,涉及Python針對(duì)目錄與文件的遍歷、判斷、讀取相關(guān)操作技巧,需要的朋友可以參考下2017-03-03

