Python多進(jìn)程庫multiprocessing中進(jìn)程池Pool類的使用詳解
問題起因
最近要將一個文本分割成好幾個topic,每個topic設(shè)計(jì)一個regressor,各regressor是相互獨(dú)立的,最后匯總所有topic的regressor得到總得預(yù)測結(jié)果。沒錯!類似bagging ensemble!只是我沒有抽樣。文本不大,大概3000行,topic個數(shù)為8,于是我寫了一個串行的程序,一個topic算完之后再算另一個topic??墒俏以诿總€topic中用了GridSearchCV來調(diào)參,又要選特征又要調(diào)整regressor的參數(shù),導(dǎo)致參數(shù)組合一共有1782種。我真是低估了調(diào)參的時間,程序跑了一天一夜最后因?yàn)橥沬mport一個庫導(dǎo)致最終的預(yù)測精度沒有算出來。后來想到,既然每個topic的預(yù)測都是獨(dú)立的,那是不是可以并行呢?
Python中的多線程與多進(jìn)程
但是聽聞Python的多線程實(shí)際上并不能真正利用多核,所以如果使用多線程實(shí)際上還是在一個核上做并發(fā)處理。不過,如果使用多進(jìn)程就可以真正利用多核,因?yàn)楦鬟M(jìn)程之間是相互獨(dú)立的,不共享資源,可以在不同的核上執(zhí)行不同的進(jìn)程,達(dá)到并行的效果。同時在我的問題中,各topic相互獨(dú)立,不涉及進(jìn)程間的通信,只需最后匯總結(jié)果,因此使用多進(jìn)程是個不錯的選擇。
multiprocessing
一個子進(jìn)程
multiprocessing模塊提供process類實(shí)現(xiàn)新建進(jìn)程。下述代碼是新建一個子進(jìn)程。
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',)) # 新建一個子進(jìn)程p,目標(biāo)函數(shù)是f,args是函數(shù)f的參數(shù)列表
p.start() # 開始執(zhí)行進(jìn)程
p.join() # 等待子進(jìn)程結(jié)束
上述代碼中p.join()的意思是等待子進(jìn)程結(jié)束后才執(zhí)行后續(xù)的操作,一般用于進(jìn)程間通信。例如有一個讀進(jìn)程pw和一個寫進(jìn)程pr,在調(diào)用pw之前需要先寫pr.join(),表示等待寫進(jìn)程結(jié)束之后才開始執(zhí)行讀進(jìn)程。
多個子進(jìn)程
如果要同時創(chuàng)建多個子進(jìn)程可以使用multiprocessing.Pool類。該類可以創(chuàng)建一個進(jìn)程池,然后在多個核上執(zhí)行這些進(jìn)程。
import multiprocessing
import time
def func(msg):
print multiprocessing.current_process().name + '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # 創(chuàng)建4個進(jìn)程
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close() # 關(guān)閉進(jìn)程池,表示不能在往進(jìn)程池中添加進(jìn)程
pool.join() # 等待進(jìn)程池中的所有進(jìn)程執(zhí)行完畢,必須在close()之后調(diào)用
print "Sub-process(es) done."
輸出結(jié)果如下:
Sub-process(es) done. PoolWorker-34-hello 1 PoolWorker-33-hello 0 PoolWorker-35-hello 2 PoolWorker-36-hello 3 PoolWorker-34-hello 7 PoolWorker-33-hello 4 PoolWorker-35-hello 5 PoolWorker-36-hello 6 PoolWorker-33-hello 8 PoolWorker-36-hello 9
上述代碼中的pool.apply_async()是apply()函數(shù)的變體,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主進(jìn)程會被阻塞直到函數(shù)執(zhí)行結(jié)束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內(nèi)置的函數(shù),兩者等價??梢钥吹捷敵鼋Y(jié)果并不是按照代碼for循環(huán)中的順序輸出的。
多個子進(jìn)程并返回值
apply_async()本身就可以返回被進(jìn)程調(diào)用的函數(shù)的返回值。上一個創(chuàng)建多個子進(jìn)程的代碼中,如果在函數(shù)func中返回一個值,那么pool.apply_async(func, (msg, ))的結(jié)果就是返回pool中所有進(jìn)程的值的對象(注意是對象,不是值本身)。
import multiprocessing
import time
def func(msg):
return multiprocessing.current_process().name + '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # 創(chuàng)建4個進(jìn)程
results = []
for i in xrange(10):
msg = "hello %d" %(i)
results.append(pool.apply_async(func, (msg, )))
pool.close() # 關(guān)閉進(jìn)程池,表示不能再往進(jìn)程池中添加進(jìn)程,需要在join之前調(diào)用
pool.join() # 等待進(jìn)程池中的所有進(jìn)程執(zhí)行完畢
print ("Sub-process(es) done.")
for res in results:
print (res.get())
上述代碼輸出結(jié)果如下:
Sub-process(es) done. PoolWorker-37-hello 0 PoolWorker-38-hello 1 PoolWorker-39-hello 2 PoolWorker-40-hello 3 PoolWorker-37-hello 4 PoolWorker-38-hello 5 PoolWorker-39-hello 6 PoolWorker-37-hello 7 PoolWorker-40-hello 8 PoolWorker-38-hello 9
與之前的輸出不同,這次的輸出是有序的。
如果電腦是八核,建立8個進(jìn)程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的,如下圖:

在system monitor中也可以清楚看到執(zhí)行多進(jìn)程前后CPU使用率曲線的差異。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python起點(diǎn)網(wǎng)月票榜字體反爬案例
大家好,本篇文章主要講的是python起點(diǎn)網(wǎng)月票榜字體反爬案例,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽2021-12-12
python 實(shí)現(xiàn)ping測試延遲的兩種方法
這篇文章主要介紹了python 實(shí)現(xiàn)ping測試延遲的兩種方法,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-12-12
Python并發(fā)concurrent.futures和asyncio實(shí)例
這篇文章主要介紹了Python并發(fā)concurrent.futures和asyncio實(shí)例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05
python基于Pandas讀寫MySQL數(shù)據(jù)庫
這篇文章主要介紹了python基于Pandas讀寫MySQL數(shù)據(jù)庫,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-04-04
Python 26進(jìn)制計(jì)算實(shí)現(xiàn)方法
這篇文章主要介紹了Python 26進(jìn)制計(jì)算實(shí)現(xiàn)方法,涉及Python字符串與數(shù)值計(jì)算的相關(guān)操作技巧,需要的朋友可以參考下2015-05-05

