python中如何使用分步式進(jìn)程計(jì)算詳解
前言
在python中使用多進(jìn)程和多線程都能達(dá)到同時(shí)運(yùn)行多個(gè)任務(wù),和多進(jìn)程和多線程的選擇上,應(yīng)該優(yōu)先選擇多進(jìn)程的方式,因?yàn)槎噙M(jìn)程更加穩(wěn)定,且對于進(jìn)程的操作管理也更加方便,但有一點(diǎn)是多進(jìn)程獨(dú)有的殺手锏,多進(jìn)程可以將進(jìn)程分步到多臺機(jī)器上跑,假如有很多個(gè)任務(wù),一臺機(jī)器即使開了多進(jìn)程或者多進(jìn)程跑起來還是要耗很多時(shí)間,那么這時(shí)就要想一下可否將任務(wù)分配到多臺機(jī)器上跑,這樣可以更快的完成任務(wù)。
在分步式進(jìn)程運(yùn)算中,進(jìn)程之前的通信還是依賴于Queue,但此時(shí)的隊(duì)列不能直接使用,需要使用multiprocessing.managers.BaseManager 進(jìn)行包裝,通過回調(diào)以后才能使用,既然是分步式的調(diào)用,那么應(yīng)該有一個(gè)服務(wù)端和一個(gè)客戶端,服務(wù)端通過網(wǎng)絡(luò)協(xié)議將隊(duì)列中的信息給各個(gè)客戶端進(jìn)行調(diào)用,客戶端也可以通過隊(duì)列將結(jié)果返回,然后服務(wù)端進(jìn)行結(jié)果的收集展示,流程如下

分步式流程
服務(wù)端將任務(wù)放到 task_queue 中,然后四個(gè)客戶端通過網(wǎng)絡(luò)端口從task_queue中獲取到任務(wù),然后進(jìn)行計(jì)算,再將結(jié)果放到result_queue中,最后服務(wù)端統(tǒng)一處理結(jié)果。整體的流程比較清晰,只是需要強(qiáng)調(diào),這里的隊(duì)列不能是原始的隊(duì)列,需要使用BaseManager 進(jìn)行包裝。
先看一下服務(wù)端的代碼
#coding:gbk
import time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務(wù)個(gè)數(shù)
task_number = 10
# 定義收發(fā)隊(duì)列
task_queue = queue.Queue(task_number)
result_queue = queue.Queue(task_number)
def gettask():
return task_queue
def getresult():
return result_queue
def test():
# windows下綁定調(diào)用接口不能使用lambda,所以只能先定義函數(shù)再綁定
BaseManager.register('get_task', callable=gettask)
BaseManager.register('get_result', callable=getresult)
# 綁定端口并設(shè)置驗(yàn)證碼,windows下需要填寫ip地址,linux下不填默認(rèn)為本地
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
# 啟動
manager.start()
try:
# 通過網(wǎng)絡(luò)獲取任務(wù)隊(duì)列和結(jié)果隊(duì)列
task = manager.get_task()
result = manager.get_result()
# 添加任務(wù)
for i in range(task_number):
print('Put task %d...' % i)
task.put(i)
# 每秒檢測一次是否所有任務(wù)都被執(zhí)行完
while not result.full():
print(task.qsize())
time.sleep(1)
for i in range(result.qsize()):
ans = result.get()
print('task %d is finish , runtime:%d s' % ans)
except:
print('Manager error')
finally:
manager.shutdown()
if __name__ == '__main__':
# windows下多進(jìn)程可能會炸,添加這句可以緩解
freeze_support()
test()
這里重點(diǎn)說一下 BaseManager.register('get_task', callable=gettask) 這行代碼,它的意思是注冊一個(gè)get_task的操作,執(zhí)行的操作是gettask()函數(shù),上面定義了gettask()函數(shù),返回的是task_queue,這也是之前說的不能直接使用queue.Queue,必須要使用通過BaseManager的register接口封裝過的的隊(duì)列,下面使用task = manager.get_task()來獲取到這個(gè)隊(duì)列。
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
這行代碼初始了一個(gè)manager,它綁定了本機(jī)的5002端口,并且在客戶端連接的時(shí)候需要一個(gè)密碼:123。
接下來看一下客戶端代碼。
#coding:gbk
import time, sys, queue, random
from multiprocessing.managers import BaseManager
BaseManager.register('get_task')
BaseManager.register('get_result')
conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123')
try:
conn.connect()
except:
print('連接失敗')
sys.exit()
task = conn.get_task()
result = conn.get_result()
while not task.empty():
print(task.qsize())
n = task.get(timeout = 1)
print('run task %d' % n)
sleeptime = random.randint(0,3)
time.sleep(sleeptime)
rt = (n, sleeptime)
result.put(rt)
if __name__ == '__main__':
pass;
這里主要看以下的代碼
BaseManager.register('get_task')
BaseManager.register('get_result')
這兩個(gè)是注冊函數(shù),和之前的服務(wù)端所對應(yīng),之前服務(wù)端注冊了這兩個(gè)函數(shù),這里才能注冊使用,注意這里不能注冊服務(wù)端沒有注冊的函數(shù)
運(yùn)行一下,先運(yùn)行服務(wù)端,然后再啟兩個(gè)cmd運(yùn)行客戶端,也可以在局域網(wǎng)中的另外的機(jī)器上運(yùn)行,但是要修改服務(wù)端的ip地址
服務(wù)端的結(jié)果如下
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
兩個(gè)客戶端的結(jié)果分別如下
客戶端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客戶端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起運(yùn)行的截圖如下
結(jié)果
由于隊(duì)列是線程安全的,所以這里不用加鎖,在客戶端中打印print(task.qsize()) 當(dāng)前的隊(duì)列大小,可以看到隊(duì)列的信息中同步到各個(gè)客戶端的。
最后還是要多說一句,分步式多進(jìn)程雖然可以把任務(wù)分散到不同的機(jī)器上運(yùn)行,可以處理多任務(wù),但是如果此時(shí)服務(wù)端掛掉的話,任務(wù)就全丟掉了,所以在生產(chǎn)環(huán)境下還是考慮使用消息中間件如kafka等。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對腳本之家的支持。
相關(guān)文章
解決編碼問題:UnicodeDecodeError: 'utf-8' codec
這篇文章主要介紹了快速解決編碼問題:UnicodeDecodeError: 'utf-8' codec can't decod,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
Python+selenium點(diǎn)擊網(wǎng)頁上指定坐標(biāo)的實(shí)例
今天小編就為大家分享一篇Python+selenium點(diǎn)擊網(wǎng)頁上指定坐標(biāo)的實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07
使用Python通過代碼創(chuàng)建圖表的詳細(xì)步驟
這篇文章主要介紹了使用Python通過代碼創(chuàng)建圖表的詳細(xì)步驟,文中介紹了如何使用DiagramasCode工具創(chuàng)建基礎(chǔ)架構(gòu)的架構(gòu)圖,并通過Python腳本生成并上傳到對象存儲桶,需要的朋友可以參考下2024-12-12
Python批量查找包含多個(gè)關(guān)鍵詞的PDF文件
在信息爆炸的時(shí)代,數(shù)據(jù)管理變得愈發(fā)重要,本文主要為大家介紹了如何通過Python批量查找包含多個(gè)關(guān)鍵詞的PDF文件,希望對大家有所幫助2024-11-11
教你利用python如何讀取txt中的數(shù)據(jù)
們使用python的時(shí)候經(jīng)常需要讀取txt文件中的內(nèi)容,下面這篇文章主要給大家介紹了關(guān)于利用python如何讀取txt中數(shù)據(jù)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03
Python腳本實(shí)現(xiàn)Web漏洞掃描工具
這是去年畢設(shè)做的一個(gè)Web漏洞掃描小工具,主要針對簡單的SQL注入漏洞、SQL盲注和XSS漏洞。下文給大家介紹了使用說明和源代碼,一起看看吧2016-10-10
python實(shí)現(xiàn)K折交叉驗(yàn)證
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)K折交叉驗(yàn)證,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-04-04

