解決python ThreadPoolExecutor 線程池中的異常捕獲問題
問題
最近寫了涉及線程池及線程的 python 腳本,運行過程中發(fā)現(xiàn)一個有趣的現(xiàn)象,線程池中的工作線程出現(xiàn)問題,引發(fā)了異常,但是主線程沒有捕獲異常,還在發(fā)現(xiàn) BUG 之前一度以為線程池代碼正常返回。
先說重點
這里主要想介紹 python concurrent.futuresthread.ThreadPoolExecutor 線程池中的 worker 引發(fā)異常的時候,并不會直接向上拋起異常,而是需要主線程通過調(diào)用concurrent.futures.Future.exception(timeout=None) 方法主動獲取 worker 的異常。
問題重現(xiàn)及解決
引子
問題主要由這樣一段代碼引起的:
def thread_executor():
logger.info("I am slave. I am working. I am going to sleep 3s")
sleep(3)
logger.info("Exit thread executor")
def main():
thread_obj = threading.Thread(target=thread_executor)
while True:
logger.info("Master starts thread worker")
try:
# 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程
thread_obj.start() # 這一行會報錯,同一線程不能重復(fù)啟動
except Exception as e:
logger.error("Master start thread error", exc_info=True)
raise e
logger.info("Master is going to sleep 5s")
sleep(5)
上面這段代碼的功能如注釋中解釋的,主要要實現(xiàn)類似生產(chǎn)者消費者的功能,工作線程一直去生產(chǎn)資源,主線程去消費工作線程生產(chǎn)的資源。但是工作線程由于異常推出了,想重新啟動生產(chǎn)工作。顯然,這個代碼會報錯。
運行結(jié)果:
thread: MainThread [INFO] Master starts thread worker
thread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3s
thread: MainThread [INFO] Master is going to sleep 5s
thread: Thread-1 [INFO] Exit thread executor because of some exception
thread: MainThread [INFO] Master starts thread worker
thread: MainThread [ERROR] Master start thread error
Traceback (most recent call last):
File "xxx.py", line 47, in main
thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
Traceback (most recent call last):
File "xxx.py", line 56, in <module>
main()
File "xxx.py", line 50, in main
raise e
File "xxx.py", line 47, in main
thread_obj.start()
File "E:\anaconda\lib\threading.py", line 843, in start
raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
切入正題
然而腳本還有其他業(yè)務(wù)代碼要運行,所以需要把上面的資源生產(chǎn)和消費的代碼放到一個線程里完成,所以引入線程池來執(zhí)行這段代碼:
def thread_executor():
while True:
logger.info("I am slave. I am working. I am going to sleep 3s")
sleep(3)
logger.info("Exit thread executor because of some exception")
break
def main():
thread_obj = threading.Thread(target=thread_executor)
while True:
logger.info("Master starts thread worker")
# 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程
# 沒有想到這里會有異常
thread_obj.start() # 這一行會報錯,同一線程不能重復(fù)啟動
logger.info("Master is going to sleep 5s")
sleep(5)
def thread_pool_main():
thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
logger.info("Master ThreadPool Executor starts thread worker")
thread_obj.submit(main)
while True:
logger.info("Master ThreadPool Executor is going to sleep 5s")
sleep(5)
if __name__ == '__main__':
thread_pool_main()
代碼運行結(jié)果如下:
INFO [thread: MainThread] Master ThreadPool Executor starts thread worker INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s INFO [thread: WorkExecutor_0] Master is going to sleep 5s INFO [thread: Thread-1] Exit thread executor because of some exception INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s ... ...
顯然,由上面的結(jié)果,在線程池 worker 執(zhí)行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的時候,是會有異常產(chǎn)生的,但是整個代碼并沒有拋棄任何異常。
解決方法
發(fā)現(xiàn)上面的 bug 后,想在線程池 worker 出錯的時候,把異常記錄到日志。查閱資料,要獲取線程池的異常信息,需要調(diào)用 concurrent.futures.Future.exception(timeout=None) 方法,為了記錄日志,這里加了線程池執(zhí)行結(jié)束的回調(diào)函數(shù)。同時,日志中記錄異常信息,用了 logging.exception() 方法。
def thread_executor():
while True:
logger.info("I am slave. I am working. I am going to sleep 3s")
sleep(3)
logger.info("Exit thread executor because of some exception")
break
def main():
thread_obj = threading.Thread(target=thread_executor)
while True:
logger.info("Master starts thread worker")
# 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程
# 沒有想到這里會有異常
thread_obj.start() # 這一行會報錯,同一線程不能重復(fù)啟動
logger.info("Master is going to sleep 5s")
sleep(5)
def thread_pool_callback(worker):
logger.info("called thread pool executor callback function")
worker_exception = worker.exception()
if worker_exception:
logger.exception("Worker return exception: {}".format(worker_exception))
def thread_pool_main():
thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor")
logger.info("Master ThreadPool Executor starts thread worker")
thread_pool_exc = thread_obj.submit(main)
thread_pool_exc.add_done_callback(thread_pool_callback)
# logger.info("thread pool exception: {}".format(thread_pool_exc.exception()))
while True:
logger.info("Master ThreadPool Executor is going to sleep 5s")
sleep(5)
if __name__ == '__main__':
thread_pool_main()
代碼運行結(jié)果:
INFO [thread: MainThread] Master ThreadPool Executor starts thread worker
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s
INFO [thread: WorkExecutor_0] Master is going to sleep 5s
INFO [thread: Thread-1] Exit thread executor because of some exception
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: WorkExecutor_0] Master starts thread worker
INFO [thread: WorkExecutor_0] called thread pool executor callback function
ERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started once
Traceback (most recent call last):
File "E:\anaconda\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "xxxx.py", line 46, in main
thread_obj.start() # 這一行會報錯,同一線程不能重復(fù)啟動
File "E:\anaconda\lib\threading.py", line 843, in start
raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s
... ...
最終的寫法
其實,上面寫法中,想重復(fù)利用一個線程去實現(xiàn)生產(chǎn)者線程的實現(xiàn)方法是有問題的,在此處,一般情況下,線程執(zhí)行結(jié)束后,線程資源會被會被操作系統(tǒng),所以線程不能被重復(fù)調(diào)用 start() 。


一種可行的實現(xiàn)方式就是,用線程池替代。當然,這樣做得注意上面提到的線程池執(zhí)行體的異常捕獲問題。
def thread_executor():
while True:
logger.info("I am slave. I am working. I am going to sleep 3s")
sleep(3)
logger.info("Exit thread executor because of some exception")
break
def executor_callback(worker):
logger.info("called worker callback function")
worker_exception = worker.exception()
if worker_exception:
logger.exception("Worker return exception: {}".format(worker_exception))
# raise worker_exception
def main():
slave_thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="SlaveExecutor")
restart_flag = False
while True:
logger.info("Master starts thread worker")
if not restart_flag:
restart_flag = not restart_flag
logger.info("Restart Slave work")
slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback)
logger.info("Master is going to sleep 5s")
sleep(5)
總結(jié)
這個問題主要還是因為對 Python 的 concurrent.futuresthread.ThreadPoolExecutor 不夠了解導(dǎo)致的,接觸這個包是在書本上,但是書本沒完全介紹包的全部 API 及用法,所以代碼產(chǎn)生異常情況后,DEBUG 了許久在真正找到問題所在。查閱 python docs 后才對其完整用法有所認識,所以,以后學(xué)習(xí)新的 python 包的時候還是可以查一查官方文檔的。
參考資料
英文版: docs of python concurrent.futures
中文版: python docs concurrent.futures — 啟動并行任務(wù)
exception(timeout=None)
返回由調(diào)用引發(fā)的異常。如果調(diào)用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內(nèi)沒有執(zhí)行完成,concurrent.futures.TimeoutError 將會被觸發(fā)。timeout 可以是整數(shù)或浮點數(shù)。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。
如果 futrue 在完成前被取消則 CancelledError 將被觸發(fā)。
如果調(diào)用正常完成那么返回 None。
add_done_callback(fn)
附加可調(diào)用 fn 到期程。當期程被取消或完成運行時,將會調(diào)用 fn,而這個期程將作為它唯一的參數(shù)。
加入的可調(diào)用對象總被屬于添加它們的進程中的線程按加入的順序調(diào)用。如果可調(diào)用對象引發(fā)一個 Exception 子類,它會被記錄下來并被忽略掉。如果可調(diào)用對象引發(fā)一個 BaseException 子類,這個行為沒有定義。
如果期程已經(jīng)完成或已取消,fn 會被立即調(diào)用。
以上這篇解決python ThreadPoolExecutor 線程池中的異常捕獲問題就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
mvc框架打造筆記之wsgi協(xié)議的優(yōu)缺點以及接口實現(xiàn)
這篇文章主要給大家介紹了關(guān)于mvc框架打造筆記之wsgi協(xié)議的優(yōu)缺點以及接口實現(xiàn)的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08
Python?xlwt工具使用詳解,生成excel欄位寬度可自適應(yīng)內(nèi)容長度
這篇文章主要介紹了Python?xlwt工具使用詳解,生成excel欄位寬度可自適應(yīng)內(nèi)容長度,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
python3實現(xiàn)raspberry pi(樹莓派)4驅(qū)小車控制程序
這篇文章主要為大家詳細介紹了python3實現(xiàn)raspberry pi(樹莓派)4驅(qū)小車控制程序,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-02-02
快速下載VScode并配置Python運行環(huán)境(圖文教程)
本文主要介紹了快速下載VScode并配置Python運行環(huán)境,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05
pytorch之pytorch?hook和關(guān)于pytorch?backward過程問題
這篇文章主要介紹了pytorch之pytorch?hook和關(guān)于pytorch?backward過程問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
Pandas數(shù)據(jù)處理庫畫圖與文件讀取使用示例
這篇文章主要為大家介紹了Pandas數(shù)據(jù)處理庫畫圖與文件讀取使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10

