Python利用multiprocessing實(shí)現(xiàn)最簡單的分布式作業(yè)調(diào)度系統(tǒng)實(shí)例
介紹
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺機(jī)器上。一個服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個機(jī)器的多個進(jìn)程中,依靠網(wǎng)絡(luò)通信。想到這,就在想是不是可以使用此模塊來實(shí)現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。在這之前,我們先來詳細(xì)了解下python中的多進(jìn)程管理包multiprocessing。
multiprocessing.Process
multiprocessing包是Python中的多進(jìn)程管理包。它與 threading.Thread類似,可以利用multiprocessing.Process對象來創(chuàng)建一個進(jìn)程。該進(jìn)程可以允許放在Python程序內(nèi)部編寫的函數(shù)中。該P(yáng)rocess對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設(shè)置)、exitcode(進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進(jìn)程,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進(jìn)程的情境。
這個模塊表示像線程一樣管理進(jìn)程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。
看一下Process類的構(gòu)造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數(shù)說明:
- group:進(jìn)程所屬組?;静挥?/li>
- target:表示調(diào)用對象。
- args:表示調(diào)用對象的位置參數(shù)元組。
- name:別名
- kwargs:表示調(diào)用對象的字典。
創(chuàng)建進(jìn)程的簡單實(shí)例:
#coding=utf-8 import multiprocessing def do(n) : #獲取當(dāng)前線程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."
執(zhí)行結(jié)果:
Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.
創(chuàng)建子進(jìn)程時,只需要傳入一個執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個Process實(shí)例,并用其start()方法啟動,join()方法表示等待子進(jìn)程結(jié)束以后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。
注意:
在Windows上要想使用進(jìn)程模塊,就必須把有關(guān)進(jìn)程的代碼寫在當(dāng)前.py文件的if __name__ == ‘__main__' :語句的下面,才能正常使用Windows下的進(jìn)程模塊。Unix/Linux下則不需要。
multiprocess.Pool
當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進(jìn)程,十幾個還好,但如果是上百個,上千個目標(biāo),手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進(jìn)程池的功效。
Pool可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來它。
apply_async和apply
函數(shù)原型:
apply_async(func[, args=()[, kwds={}[, callback=None]]])
二者都是向進(jìn)程池中添加新的進(jìn)程,不同的時,apply每次添加新的進(jìn)程時,主進(jìn)程和新的進(jìn)程會并行執(zhí)行,但是主進(jìn)程會阻塞,直到新進(jìn)程的函數(shù)執(zhí)行結(jié)束。 這是很低效的,所以python3.x之后不再使用
apply_async和apply功能相同,但是主進(jìn)程不會阻塞。
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
if __name__ == "__main__":
# 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去
pool = multiprocessing.Pool(processes=3)
for i in range(10):
msg = "hello [{}]".format(i)
# pool.apply(func, (msg,))
pool.apply_async(func, (msg,)) # 異步開啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)
print "--" * 10
pool.close() # 關(guān)閉pool, 則不會有新的進(jìn)程添加進(jìn)去
pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
print "All process done."
運(yùn)行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *msg: hello [3] *end *end *msg: hello [4] *msg: hello [5] *end *msg: hello [6] *end *end *msg: hello [7] *msg: hello [8] *end *msg: hello [9] *end*end *end All process done. Process finished with exit code 0
獲得進(jìn)程的執(zhí)行結(jié)果
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func_with_return(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
return "{} return".format(msg)
if __name__ == "__main__":
# 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去
pool = multiprocessing.Pool(processes=3)
results = []
for i in range(10):
msg = "hello [{}]".format(i)
res = pool.apply_async(func_with_return, (msg,)) # 異步開啟進(jìn)程, 非阻塞型, 能夠向池中添加進(jìn)程而不等待其執(zhí)行完畢就能再次執(zhí)行循環(huán)
results.append(res)
print "--" * 10
pool.close() # 關(guān)閉pool, 則不會有新的進(jìn)程添加進(jìn)去
pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
print "All process done."
print "Return results: "
for i in results:
print i.get() # 獲得進(jìn)程的執(zhí)行結(jié)果
結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py -------------------- *msg: hello [0] *msg: hello [1] *msg: hello [2] *end *end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end *end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
map
函數(shù)原型:
map(func, iterable[, chunksize=None])
Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會使進(jìn)程阻塞直到返回結(jié)果。
注意,雖然第二個參數(shù)是一個迭代器,但在實(shí)際使用中,必須在整個隊(duì)列都就緒后,程序才會運(yùn)行子進(jìn)程。
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func_with_return(msg):
print "*msg: ", msg
time.sleep(3)
print "*end"
return "{} return".format(msg)
if __name__ == "__main__":
# 維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去
pool = multiprocessing.Pool(processes=3)
results = []
msgs = []
for i in range(10):
msg = "hello [{}]".format(i)
msgs.append(msg)
results = pool.map(func_with_return, msgs)
print "--" * 10
pool.close() # 關(guān)閉pool, 則不會有新的進(jìn)程添加進(jìn)去
pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執(zhí)行完畢
print "All process done."
print "Return results: "
for i in results:
print i # 獲得進(jìn)程的執(zhí)行結(jié)果
執(zhí)行結(jié)果:
"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py *msg: hello [0] *msg: hello [1] *msg: hello [2] *end*end *msg: hello [3] *msg: hello [4] *end *msg: hello [5] *end*end *msg: hello [6] *msg: hello [7] *end *msg: hello [8] *end *end *msg: hello [9] *end *end -------------------- All process done. Return results: hello [0] return hello [1] return hello [2] return hello [3] return hello [4] return hello [5] return hello [6] return hello [7] return hello [8] return hello [9] return Process finished with exit code 0
注意:執(zhí)行結(jié)果中“—-”的位置,可以看到,map之后,主進(jìn)程是阻塞的,等待map的結(jié)果返回
close()
關(guān)閉進(jìn)程池(pool),使其不在接受新的任務(wù)。
terminate()
結(jié)束工作進(jìn)程,不在處理未處理的任務(wù)。
join()
主進(jìn)程阻塞等待子進(jìn)程的退出,join方法必須在close或terminate之后使用。
進(jìn)程間通信
多進(jìn)程最麻煩的地方就是進(jìn)程間通信,IPC比線程通信要難處理的多,所以留作單獨(dú)一篇來記錄
利用multiprocessing實(shí)現(xiàn)一個最簡單的分布式作業(yè)調(diào)度系統(tǒng)
Job
首先創(chuàng)建一個Job類,為了測試簡單,只包含一個job id屬性,將來可以封裝一些作業(yè)狀態(tài),作業(yè)命令,執(zhí)行用戶等屬性。
job.py
#!/usr/bin/env python # -*- coding: utf-8 -*- class Job: def __init__(self, job_id): self.job_id = job_id
Master
Master用來派發(fā)作業(yè)和顯示運(yùn)行完成的作業(yè)信息
master.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job
class Master:
def __init__(self):
# 派發(fā)出去的作業(yè)隊(duì)列
self.dispatched_job_queue = Queue()
# 完成的作業(yè)隊(duì)列
self.finished_job_queue = Queue()
def get_dispatched_job_queue(self):
return self.dispatched_job_queue
def get_finished_job_queue(self):
return self.finished_job_queue
def start(self):
# 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊到網(wǎng)絡(luò)上
BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)
# 監(jiān)聽端口和啟動服務(wù)
manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
manager.start()
# 使用上面注冊的方法獲取隊(duì)列
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# 這里一次派發(fā)10個作業(yè),等到10個作業(yè)都運(yùn)行完后,繼續(xù)再派發(fā)10個作業(yè)
job_id = 0
while True:
for i in range(0, 10):
job_id = job_id + 1
job = Job(job_id)
print('Dispatch job: %s' % job.job_id)
dispatched_jobs.put(job)
while not dispatched_jobs.empty():
job = finished_jobs.get(60)
print('Finished Job: %s' % job.job_id)
manager.shutdown()
if __name__ == "__main__":
master = Master()
master.start()
Slave
Slave用來運(yùn)行master派發(fā)的作業(yè)并將結(jié)果返回
slave.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job
class Slave:
def __init__(self):
# 派發(fā)出去的作業(yè)隊(duì)列
self.dispatched_job_queue = Queue()
# 完成的作業(yè)隊(duì)列
self.finished_job_queue = Queue()
def start(self):
# 把派發(fā)作業(yè)隊(duì)列和完成作業(yè)隊(duì)列注冊到網(wǎng)絡(luò)上
BaseManager.register('get_dispatched_job_queue')
BaseManager.register('get_finished_job_queue')
# 連接master
server = '127.0.0.1'
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, 8888), authkey='jobs')
manager.connect()
# 使用上面注冊的方法獲取隊(duì)列
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# 運(yùn)行作業(yè)并返回結(jié)果,這里只是模擬作業(yè)運(yùn)行,所以返回的是接收到的作業(yè)
while True:
job = dispatched_jobs.get(timeout=1)
print('Run job: %s ' % job.job_id)
time.sleep(1)
finished_jobs.put(job)
if __name__ == "__main__":
slave = Slave()
slave.start()
測試
分別打開三個linux終端,第一個終端運(yùn)行master,第二個和第三個終端用了運(yùn)行slave,運(yùn)行結(jié)果如下
master
$ python master.py Dispatch job: 1 Dispatch job: 2 Dispatch job: 3 Dispatch job: 4 Dispatch job: 5 Dispatch job: 6 Dispatch job: 7 Dispatch job: 8 Dispatch job: 9 Dispatch job: 10 Finished Job: 1 Finished Job: 2 Finished Job: 3 Finished Job: 4 Finished Job: 5 Finished Job: 6 Finished Job: 7 Finished Job: 8 Finished Job: 9 Dispatch job: 11 Dispatch job: 12 Dispatch job: 13 Dispatch job: 14 Dispatch job: 15 Dispatch job: 16 Dispatch job: 17 Dispatch job: 18 Dispatch job: 19 Dispatch job: 20 Finished Job: 10 Finished Job: 11 Finished Job: 12 Finished Job: 13 Finished Job: 14 Finished Job: 15 Finished Job: 16 Finished Job: 17 Finished Job: 18 Dispatch job: 21 Dispatch job: 22 Dispatch job: 23 Dispatch job: 24 Dispatch job: 25 Dispatch job: 26 Dispatch job: 27 Dispatch job: 28 Dispatch job: 29 Dispatch job: 30
slave1
$ python slave.py Connect to server 127.0.0.1... Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23
slave2
$ python slave.py Connect to server 127.0.0.1... Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
Biblibili視頻投稿接口分析并以Python實(shí)現(xiàn)自動投稿功能
這篇文章主要介紹了Biblibili視頻投稿接口分析并以Python實(shí)現(xiàn)自動投稿功能,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-02-02
基于Python實(shí)現(xiàn)PDF轉(zhuǎn)換文件格式
這篇文章主要為大家詳細(xì)介紹了如何基于Python實(shí)現(xiàn)PDF轉(zhuǎn)換文件格式,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-01-01
python快速進(jìn)階利用Tkinter定制一個信息提示框
這篇文章主要介紹了python快速進(jìn)階利用Tkinter定制一個信息提示框,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
Python圖像處理庫PIL的ImageFont模塊使用介紹
這篇文章主要介紹了Python圖像處理庫PIL的ImageFont模塊使用介紹,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02

