解決windows下python3使用multiprocessing.Pool出現(xiàn)的問(wèn)題
例如:
from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, range(100)) pool.close() pool.join()
在spyder里運(yùn)行直接沒(méi)反應(yīng);在shell窗口里,直接報(bào)錯(cuò),如下:
Process SpawnPoolWorker-15: Traceback (most recent call last): File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr self.run() File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker task = get() File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get return ForkingPickler.loads(res) AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
解決:
Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地變量都復(fù)制一份,因此可以使用任意的全局變量;在Windows下面,多進(jìn)程是通過(guò)啟動(dòng)新進(jìn)程完成的,所有的全局變量都是重新初始化的,在運(yùn)行過(guò)程中動(dòng)態(tài)生成、修改過(guò)的全局變量是不能使用的。
multiprocessing內(nèi)部使用pickling傳遞map的參數(shù)到不同的進(jìn)程,當(dāng)傳遞一個(gè)函數(shù)或類(lèi)時(shí),pickling將函數(shù)或者類(lèi)用所在模塊+函數(shù)/類(lèi)名的方式表示,如果對(duì)端的Python進(jìn)程無(wú)法在對(duì)應(yīng)的模塊中找到相應(yīng)的函數(shù)或者類(lèi),就會(huì)出錯(cuò)。
當(dāng)你在Interactive Console當(dāng)中創(chuàng)建函數(shù)的時(shí)候,這個(gè)函數(shù)是動(dòng)態(tài)添加到__main__模塊中的,在重新啟動(dòng)的新進(jìn)程當(dāng)中不存在,所以會(huì)出錯(cuò)。
當(dāng)不在Console中,而是在獨(dú)立Python文件中運(yùn)行時(shí),你會(huì)遇到另一個(gè)問(wèn)題:由于你下面調(diào)用multiprocessing的代碼沒(méi)有保護(hù),在新進(jìn)程加載這個(gè)模塊的時(shí)候會(huì)重新執(zhí)行這段代碼,創(chuàng)建出新的multiprocessing池,無(wú)限調(diào)用下去。
解決這個(gè)問(wèn)題的方法是永遠(yuǎn)把實(shí)際執(zhí)行功能的代碼加入到帶保護(hù)的區(qū)域中:if __name__ == '__mian__':
補(bǔ)充知識(shí):multiprocessing Pool的異常處理問(wèn)題
multiprocessing.Pool開(kāi)發(fā)多進(jìn)程程序時(shí),在某個(gè)子進(jìn)程執(zhí)行函數(shù)使用了mysql-python連接數(shù)據(jù)庫(kù),
由于程序設(shè)計(jì)問(wèn)題,沒(méi)有捕獲到所有異常,導(dǎo)致某個(gè)異常錯(cuò)誤直接拋到Pool中,導(dǎo)致整個(gè)Pool掛了,其異常錯(cuò)誤如下所示:
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 765, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results
task = get()
File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__
'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>,
(2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))
本文檔基于以上問(wèn)題對(duì)multiprocessing.Pool以及python-mysql-connector的源碼實(shí)現(xiàn)進(jìn)行分析,以定位具體的錯(cuò)誤原因。解決方法其實(shí)很簡(jiǎn)單,不要讓異常拋到Pool里就行。
問(wèn)題產(chǎn)生場(chǎng)景
python 版本centos7.3自帶的2.7.5版本,或者最新的python-2.7.14
mysql-connector庫(kù),版本是2.0及以上,可到官網(wǎng)下載最新版:mysql-connector
問(wèn)題發(fā)生的code其實(shí)可以簡(jiǎn)化為如下所示:
from multiprocessing import Pool, log_to_stderr
import logging
import mysql.connector
# open multiprocessing lib log
log_to_stderr(level=logging.DEBUG)
def func():
raise mysql.connector.Error("demo test", 100)
if __name__ == "__main__":
p = Pool(3)
res = p.apply_async(func)
res.get()
所以解決問(wèn)題很簡(jiǎn)單,在func里加個(gè)try-except就可以了。但是如果你好奇為什么為出現(xiàn)AttributeError的異常,那么可以繼續(xù)往下看。
Multiprocessing.Pool的實(shí)現(xiàn)
通過(guò)查看源碼,大致上multiprocess.Pool的實(shí)現(xiàn)如下圖所示:

當(dāng)我們執(zhí)行以下語(yǔ)句時(shí),主進(jìn)程會(huì)創(chuàng)建三個(gè)子線程:_handle_workers、_handle_results、_handle_tasks;同時(shí)會(huì)創(chuàng)建Pool(n)個(gè)數(shù)的worker子進(jìn)程。主進(jìn)程與各個(gè)worker子進(jìn)程間的通信使用內(nèi)部定義的Queue,其實(shí)就是Pipe管道通信,如上圖的_taskqueue、_inqueue和_outqueue。
p = Pool(3) res = p.apply_async(func) res.get()
這三個(gè)子線程的作用是:
1. handle_workers線程管理worker進(jìn)程,使進(jìn)程池維持Pool(n)個(gè)worker進(jìn)程數(shù);
2. handle_tasks線程將用戶的任務(wù)(包括job_id, 處理函數(shù)func等信息)傳遞到_inqueue中,子進(jìn)程們競(jìng)爭(zhēng)獲取任務(wù),然后運(yùn)行相關(guān)函數(shù),將結(jié)果放在_outqueue中,然后繼續(xù)監(jiān)聽(tīng)tasksqueue的任務(wù)列表。其實(shí)就是典型的生產(chǎn)消費(fèi)問(wèn)題。
3. handle_results線程監(jiān)聽(tīng)_outQqueue的內(nèi)容,有就拿到,通過(guò)字典_cache找到對(duì)應(yīng)的job,將結(jié)果存儲(chǔ)在*Result對(duì)象中,釋放該job的信號(hào)量,表明job執(zhí)行完畢。此后,就可以通過(guò)*Result.get()函數(shù)獲取執(zhí)行結(jié)果。
當(dāng)我們調(diào)用p.apply_async 或者p.map時(shí),其實(shí)就是創(chuàng)建了AsyncResult或者M(jìn)apResult對(duì)象,然后將task放到_taskqueue中;調(diào)用*Result.get()方法等待task被worker子進(jìn)程執(zhí)行完成,獲取執(zhí)行結(jié)果。
在知道了multprocess.Pool的實(shí)現(xiàn)邏輯后,現(xiàn)在我們來(lái)探索下,當(dāng)func將異常拋出時(shí),Pool的worker是怎么處理的。下面的代碼是pool.worker工作子進(jìn)程的核心執(zhí)行函數(shù)的簡(jiǎn)化版。
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): ... while xxx: try: task = get() except: ... job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) ... try: put((job, i, result)) except Exception, e: ...
從代碼中可以看到,在執(zhí)行func時(shí),如果func拋出異常,那么worker會(huì)將異常對(duì)象直接放入到_outqueue中,然后等待下一個(gè)task。也就是說(shuō),worker是可以處理異常的。
那么接下來(lái)看看_handle_result線程是怎么處理worker發(fā)過(guò)來(lái)的結(jié)果的。如下所示:
@staticmethod def _handle_results(outqueue, get, cache): while 1: try: task = get() except (IOError, EOFError): return ...
上述代碼為_(kāi)handle_result的主要處理邏輯,可以看到,它只對(duì) IOError, EOFError進(jìn)行了處理,也就是說(shuō),如果在get()時(shí)發(fā)生了其它異常錯(cuò)誤,將導(dǎo)致_handle_result這個(gè)線程直接退出(而事實(shí)上的確如此)。既然_handle_result退出了,那么就沒(méi)有動(dòng)作來(lái)觸發(fā)_cache中*Result對(duì)象釋放信號(hào)量,則用戶的執(zhí)行流程就一直處于wait狀態(tài)。這樣,用戶主進(jìn)程就會(huì)一直卡在get()中,導(dǎo)致主流程執(zhí)行不下去。
我們通過(guò)打開(kāi)multiprocessing庫(kù)的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代碼,加上一個(gè)except Exception,然后運(yùn)行文章一開(kāi)始的的異常代碼,如下所示:
# multiprocessing : pool.py
#
class Pool(object):
@staticmethod
def _handle_results(outqueue, get, cache):
while 1:
try:
task = get()
except (IOError, EOFError):
return
except Exception:
debug("handle_result not catch Exceptions.")
return
...
控制臺(tái)如果輸出"handle_result not catch Exceptions.",表明_handle_results沒(méi)有catch到所有的異常。而實(shí)際上,真的是由于task = get()這句話拋異常了。
那么,_outqueue.get()方法做了什么。深入查看源碼,發(fā)現(xiàn)get()方法其實(shí)就是os.pipe的read/write方法,但是做了一些處理吧。其內(nèi)部實(shí)現(xiàn)大致如下:
def Pipe(duplex=True): ... fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) # get c2 = _multiprocessing.Connection(fd2, readable=False) # put return c1, c2
_multiprocessing.Connection內(nèi)部使用了C的實(shí)現(xiàn),就不再深入了,否則會(huì)就越來(lái)越復(fù)雜了。它內(nèi)部應(yīng)該使用了pickle庫(kù),在put時(shí)將對(duì)象實(shí)例pickle(也就是序列化吧),然后在get時(shí)將實(shí)例unpikcle,重新生成實(shí)例對(duì)象。具體可查看python官方文檔關(guān)于pickle的介紹(包括object可pickle的條件以及在unpickle時(shí)調(diào)用的方法等)。不管如何,就是實(shí)例在get,即unpickle的過(guò)程出錯(cuò)了。
'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: 'int' object has no attribute 'encode'
從上述錯(cuò)誤日志中可以看到,表明在重構(gòu)時(shí)msg參數(shù)傳入了int類(lèi)型變量。就是說(shuō)在unpickle階段,Mysql Error重新實(shí)例化時(shí)執(zhí)行了__init__()方法,但是傳參錯(cuò)誤了。為了驗(yàn)證這一現(xiàn)象,我將MySql Error的__init__()進(jìn)行簡(jiǎn)化,最終確認(rèn)到self.args的賦值上,即Exception及其子類(lèi)在unpickle時(shí)會(huì)調(diào)用__init__()方法,并將self.args作為參數(shù)列表傳遞給__init__()。
通過(guò)以下代碼可以簡(jiǎn)單的驗(yàn)證問(wèn)題:
import os
from multiprocessing import Pipe
class DemoError(Exception):
def __init__(msg, errno):
print "msg: %s, errno: %s" % (msg, errno)
self.args = ("aa", "bb")
def func():
raise DemoError("demo test", 100)
r, w = Pipe(duplex=False)
try:
result = (True, func(1))
except Exception, e:
result = (False, e)
print "send result"
w.send(result)
print "get result"
res = r.recv()
print "finished."
日志會(huì)在recv調(diào)用時(shí)打印 msg: aa, errno: bb,表明recv異常類(lèi)Exception時(shí)會(huì)將self.args作為參數(shù)傳入init()函數(shù)中。而Mysql的Error類(lèi)重寫(xiě)self.args變量,而且順序不對(duì),導(dǎo)致msg在執(zhí)行編碼時(shí)出錯(cuò)。MySql Error的實(shí)現(xiàn)簡(jiǎn)化如下:
class Error(Exception):
def __init__(self, msg=None, errno=None, values=None, sqlstate=None):
super(Error, self).__init__()
...
if self.msg and self.errno != -1:
fields = {
'errno': self.errno,
'msg': self.msg.encode('utf-8') if PY2 else self.msg
}
...
self.args = (self.errno, self._full_msg, self.sqlstate)
可以看到,mysql Error中的self.args與__init__(msg, errno, values, sqlstate)的順序不一,因此self.args第一個(gè)參數(shù)errno傳給了msg,導(dǎo)致AttributeError。至于self.args是什么,簡(jiǎn)單查了下,是Exception類(lèi)中定義的,一般用__str__或者_(dá)_repr__方法的輸出,python官方文檔不建議overwrite。
總結(jié)
好吧,說(shuō)了這么多,通過(guò)問(wèn)題的追蹤,我們也基本上了解清楚multiprocessing.Pool庫(kù)的實(shí)現(xiàn)了。事實(shí)上,也很難說(shuō)是誰(shuí)的bug,是兩者共同作用下出現(xiàn)的。不管如何,希望在用到multiprocessing庫(kù)時(shí),特別與Pipe相關(guān)時(shí),謹(jǐn)慎點(diǎn)使用,最好的不要讓異常跑到multiprocess中處理,應(yīng)該在func中將所有的異常處理掉,如果有自己定于的異常類(lèi),請(qǐng)最好保證self.args的順序與__init__()的順序一致。同時(shí),網(wǎng)上好像也聽(tīng)說(shuō)使用multprocessing和subprocess庫(kù)出現(xiàn)問(wèn)題,或許也是這個(gè)異常拋出的問(wèn)題,畢竟suprocessError定義與Exception好像有些區(qū)別。
以上這篇解決windows下python3使用multiprocessing.Pool出現(xiàn)的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
淺談Python3中datetime不同時(shí)區(qū)轉(zhuǎn)換介紹與踩坑
最近的項(xiàng)目需要根據(jù)用戶所屬時(shí)區(qū)制定一些特定策略,學(xué)習(xí)、應(yīng)用了若干python3的時(shí)區(qū)轉(zhuǎn)換相關(guān)知識(shí),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
Django開(kāi)發(fā)RESTful API實(shí)現(xiàn)增刪改查(入門(mén)級(jí))
這篇文章主要介紹了Django開(kāi)發(fā)RESTful API實(shí)現(xiàn)增刪改查(入門(mén)級(jí)),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
Python如何將JavaScript轉(zhuǎn)換為json
文章介紹了如何使用Python的re模塊將JavaScript代碼轉(zhuǎn)換為JSON格式,首先,使用正則表達(dá)式匹配并替換JavaScript代碼中的數(shù)字,確保它們被雙引號(hào)括起來(lái),然后,使用另一個(gè)正則表達(dá)式匹配并替換JavaScript代碼中的鍵值對(duì),確保鍵和值都被雙引號(hào)括起來(lái)2025-02-02
Python3多線程基礎(chǔ)知識(shí)點(diǎn)
在本篇內(nèi)容里小編給大家分享了關(guān)于Python3多線程基礎(chǔ)知識(shí)點(diǎn)內(nèi)容,需要的朋友們跟著學(xué)習(xí)參考下。2019-02-02
python3模塊smtplib實(shí)現(xiàn)發(fā)送郵件功能
這篇文章主要為大家詳細(xì)介紹了python3模塊smtplib實(shí)現(xiàn)發(fā)送郵件功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05
python使用PythonMagick將jpg圖片轉(zhuǎn)換成ico圖片的方法
這篇文章主要介紹了python使用PythonMagick將jpg圖片轉(zhuǎn)換成ico圖片的方法,涉及PythonMagick模塊操作圖片的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03
Python循環(huán)中else,break和continue的用法實(shí)例詳解
這篇文章主要介紹了Python循環(huán)中else,break和continue的用法,結(jié)合實(shí)例形式詳細(xì)分析了Python for循環(huán)、while循環(huán)中else,break和continue的功能、用法及相關(guān)操作注意事項(xiàng),需要的朋友可以參考下2019-07-07

