Python快速實(shí)現(xiàn)一個(gè)線程池的示例代碼
楔子
當(dāng)有多個(gè) IO 密集型的任務(wù)要被處理時(shí),我們自然而然會(huì)想到多線程。但如果任務(wù)非常多,我們不可能每一個(gè)任務(wù)都啟動(dòng)一個(gè)線程去處理,這個(gè)時(shí)候最好的辦法就是實(shí)現(xiàn)一個(gè)線程池,至于池子里面的線程數(shù)量可以根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行設(shè)置。
比如我們實(shí)現(xiàn)一個(gè)有 10 個(gè)線程的線程池,這樣可以并發(fā)地處理 10 個(gè)任務(wù),每個(gè)線程將任務(wù)執(zhí)行完之后,便去執(zhí)行下一個(gè)任務(wù)。通過(guò)使用線程池,可以避免因線程創(chuàng)建過(guò)多而導(dǎo)致資源耗盡,而且任務(wù)在執(zhí)行時(shí)的生命周期也可以很好地把控。
而線程池的實(shí)現(xiàn)方式也很簡(jiǎn)單,但這里我們不打算手動(dòng)實(shí)現(xiàn),因?yàn)?Python 提供了一個(gè)標(biāo)準(zhǔn)庫(kù) concurrent.futures,已經(jīng)內(nèi)置了對(duì)線程池的支持。所以本篇文章,我們就來(lái)詳細(xì)介紹一下該模塊的用法。
Future 對(duì)象
當(dāng)我們往線程池里面提交一個(gè)函數(shù)時(shí),會(huì)分配一個(gè)線程去執(zhí)行,同時(shí)立即返回一個(gè) Future 對(duì)象。通過(guò) Future 對(duì)象可以監(jiān)控函數(shù)的執(zhí)行狀態(tài),有沒(méi)有出現(xiàn)異常,以及有沒(méi)有執(zhí)行完畢等等。如果函數(shù)執(zhí)行完畢,內(nèi)部便會(huì)調(diào)用 future.set_result 將返回值設(shè)置到 future 里面,然后外界便可調(diào)用 future.result 拿到返回值。
除此之外 future 還可以綁定回調(diào),一旦函數(shù)執(zhí)行完畢,就會(huì)以 future 為參數(shù),自動(dòng)觸發(fā)回調(diào)。所以 future 被稱為未來(lái)對(duì)象,可以把它理解為函數(shù)的一個(gè)容器,當(dāng)我們往線程池提交一個(gè)函數(shù)時(shí),會(huì)立即創(chuàng)建相應(yīng)的 future 然后返回。函數(shù)的執(zhí)行狀態(tài)什么的,都通過(guò) future 來(lái)查看,當(dāng)然也可以給它綁定一個(gè)回調(diào),在函數(shù)執(zhí)行完畢時(shí)自動(dòng)觸發(fā)。
那么下面我們就來(lái)看一下 future 的用法,文字的話理解起來(lái)可能有點(diǎn)枯燥。
"""
將函數(shù)提交到線程池里面運(yùn)行時(shí),會(huì)立即返回一個(gè)對(duì)象
這個(gè)對(duì)象就叫做?Future?對(duì)象,里面包含了函數(shù)的執(zhí)行狀態(tài)等等
當(dāng)然我們也可以手動(dòng)創(chuàng)建一個(gè)Future對(duì)象。
"""
from?concurrent.futures?import?Future
#?創(chuàng)建?Future?對(duì)象?future
future?=?Future()
#?給?future?綁定回調(diào)
def?callback(f:?Future):
????print("當(dāng)set_result的時(shí)候會(huì)執(zhí)行回調(diào),result:",
??????????f.result())
future.add_done_callback(callback)
#?通過(guò)?add_done_callback?方法即可給?future?綁定回調(diào)
#?調(diào)用的時(shí)候會(huì)自動(dòng)將?future?作為參數(shù)
#?如果需要多個(gè)參數(shù),那么就使用偏函數(shù)
#?回調(diào)函數(shù)什么時(shí)候執(zhí)行呢?
#?顯然是當(dāng)?future?執(zhí)行?set_result?的時(shí)候
#?如果?future?是向線程池提交函數(shù)時(shí)返回的
#?那么當(dāng)函數(shù)執(zhí)行完畢時(shí)會(huì)自動(dòng)執(zhí)行?future.set_result(xx)
#?并將自身的返回設(shè)置進(jìn)去
#?而這里的?future?是我們手動(dòng)創(chuàng)建的,因此需要手動(dòng)執(zhí)行
future.set_result("嘿嘿")
"""
當(dāng)set_result的時(shí)候會(huì)執(zhí)行回調(diào),result:?嘿嘿
"""需要注意的是:只能執(zhí)行一次 set_result,但是可以多次調(diào)用 result 獲取結(jié)果。
from?concurrent.futures?import?Future
future?=?Future()
future.set_result("哼哼")
print(future.result())??#?哼哼
print(future.result())??#?哼哼
print(future.result())??#?哼哼執(zhí)行 future.result() 之前一定要先 set_result,否則會(huì)一直處于阻塞狀態(tài)。當(dāng)然 result 方法還可以接收一個(gè) timeout 參數(shù),表示超時(shí)時(shí)間,如果在指定時(shí)間內(nèi)沒(méi)有獲取到值就會(huì)拋出異常。
提交函數(shù)自動(dòng)創(chuàng)建 Future 對(duì)象
我們上面是手動(dòng)創(chuàng)建的 Future 對(duì)象,但工作中很少會(huì)手動(dòng)創(chuàng)建。我們將函數(shù)提交到線程池里面運(yùn)行的時(shí)候,會(huì)自動(dòng)創(chuàng)建 Future 對(duì)象并返回。這個(gè) Future 對(duì)象里面就包含了函數(shù)的執(zhí)行狀態(tài),比如此時(shí)是處于暫停、運(yùn)行中還是完成等等,并且函數(shù)在執(zhí)行完畢之后,還會(huì)調(diào)用 future.set_result 將自身的返回值設(shè)置進(jìn)去。
from?concurrent.futures?import?ThreadPoolExecutor
import?time
def?task(name,?n):
????time.sleep(n)
????return?f"{name}?睡了?{n}?秒"
#?創(chuàng)建一個(gè)線程池
#?里面還可以指定?max_workers?參數(shù),表示最多創(chuàng)建多少個(gè)線程
#?如果不指定,那么每提交一個(gè)函數(shù),都會(huì)為其創(chuàng)建一個(gè)線程
executor?=?ThreadPoolExecutor()
#?通過(guò)?submit?即可將函數(shù)提交到線程池,一旦提交,就會(huì)立刻運(yùn)行
#?因?yàn)殚_(kāi)啟了一個(gè)新的線程,主線程會(huì)繼續(xù)往下執(zhí)行
#?至于?submit?的參數(shù),按照函數(shù)名,對(duì)應(yīng)參數(shù)提交即可
#?切記不可寫(xiě)成task("古明地覺(jué)",?3),這樣就變成調(diào)用了
future?=?executor.submit(task,?"古明地覺(jué)",?3)
#?由于函數(shù)里面出現(xiàn)了?time.sleep,并且指定的?n?是?3
#?所以函數(shù)內(nèi)部會(huì)休眠?3?秒,顯然此時(shí)處于運(yùn)行狀態(tài)
print(future)
"""
<Future?at?0x7fbf701726d0?state=running>
"""
#?我們說(shuō)?future?相當(dāng)于一個(gè)容器,包含了內(nèi)部函數(shù)的執(zhí)行狀態(tài)
#?函數(shù)是否正在運(yùn)行中
print(future.running())
"""
True
"""
#?函數(shù)是否執(zhí)行完畢
print(future.done())
"""
False
"""
#?主程序也?sleep?3?秒
time.sleep(3)
#?顯然此時(shí)函數(shù)已經(jīng)執(zhí)行完畢了
#?并且打印結(jié)果還告訴我們返回值類(lèi)型是?str
print(future)
"""
<Future?at?0x7fbf701726d0?state=finished?returned?str>
"""
print(future.running())
"""
False
"""
print(future.done())
"""
True
"""
#?函數(shù)執(zhí)行完畢時(shí),會(huì)將返回值設(shè)置在?future?里
#?也就是說(shuō)一旦執(zhí)行了?future.set_result
#?那么就表示函數(shù)執(zhí)行完畢了,然后外界可以調(diào)用?result?拿到返回值
print(future.result())
"""
古明地覺(jué)?睡了?3?秒
"""這里再?gòu)?qiáng)調(diào)一下 future.result(),這一步是會(huì)阻塞的,舉個(gè)例子:
#?提交函數(shù) future?=?executor.submit(task,?"古明地覺(jué)",?3) start?=?time.perf_counter() future.result() end?=?time.perf_counter() print(end?-?start)??#?3.00331525
可以看到,future.result() 這一步花了將近 3s。其實(shí)也不難理解,future.result() 是干嘛的?就是為了獲取函數(shù)的返回值,可函數(shù)都還沒(méi)有執(zhí)行完畢,它又從哪里獲取呢?所以只能先等待函數(shù)執(zhí)行完畢,將返回值通過(guò) set_result 設(shè)置到 future 里面之后,外界才能調(diào)用 future.result() 獲取到值。
如果不想一直等待的話,那么在獲取值的時(shí)候可以傳入一個(gè)超時(shí)時(shí)間。
from?concurrent.futures?import?(
????ThreadPoolExecutor,
????TimeoutError
)
import?time
def?task(name,?n):
????time.sleep(n)
????return?f"{name}?睡了?{n}?秒"
executor?=?ThreadPoolExecutor()
future?=?executor.submit(task,?"古明地覺(jué)",?3)
try:
????#?1?秒之內(nèi)獲取不到值,拋出?TimeoutError
????res?=?future.result(1)
except?TimeoutError:
????pass
#?再?sleep?2?秒,顯然函數(shù)執(zhí)行完畢了
time.sleep(2)
#?獲取返回值
print(future.result())
"""
古明地覺(jué)?睡了?3?秒
"""當(dāng)然啦,這么做其實(shí)還不夠智能,因?yàn)槲覀儾恢篮瘮?shù)什么時(shí)候執(zhí)行完畢。所以最好的辦法還是綁定一個(gè)回調(diào),當(dāng)函數(shù)執(zhí)行完畢時(shí),自動(dòng)觸發(fā)回調(diào)。
from?concurrent.futures?import?ThreadPoolExecutor
import?time
def?task(name,?n):
????time.sleep(n)
????return?f"{name}?睡了?{n}?秒"
def?callback(f):
????print(f.result())
executor?=?ThreadPoolExecutor()
future?=?executor.submit(task,?"古明地覺(jué)",?3)
#?綁定回調(diào),3?秒之后自動(dòng)調(diào)用
future.add_done_callback(callback)
"""
古明地覺(jué)?睡了?3?秒
"""需要注意的是,在調(diào)用 submit 方法之后,提交到線程池的函數(shù)就已經(jīng)開(kāi)始執(zhí)行了。而不管函數(shù)有沒(méi)有執(zhí)行完畢,我們都可以給對(duì)應(yīng)的 future 綁定回調(diào)。
如果函數(shù)完成之前添加回調(diào),那么會(huì)在函數(shù)完成后觸發(fā)回調(diào)。如果函數(shù)完成之后添加回調(diào),由于函數(shù)已經(jīng)完成,代表此時(shí)的 future 已經(jīng)有值了,或者說(shuō)已經(jīng) set_result 了,那么會(huì)立即觸發(fā)回調(diào)。
future.set_result 到底干了什么事情
當(dāng)函數(shù)執(zhí)行完畢之后,會(huì)執(zhí)行 set_result,那么這個(gè)方法到底干了什么事情呢?

我們看到 future 有兩個(gè)被保護(hù)的屬性,分別是 _result 和 _state。顯然 _result 用于保存函數(shù)的返回值,而 future.result() 本質(zhì)上也是返回 _result 屬性的值。而 _state 屬性則用于表示函數(shù)的執(zhí)行狀態(tài),初始為 PENDING,執(zhí)行中為 RUNING,執(zhí)行完畢時(shí)被設(shè)置為 FINISHED。
調(diào)用 future.result() 的時(shí)候,會(huì)判斷 _state 的屬性,如果還在執(zhí)行中就一直等待。當(dāng) _state 為 FINISHED 的時(shí)候,就返回 _result 屬性的值。
提交多個(gè)函數(shù)
我們上面每次只提交了一個(gè)函數(shù),但其實(shí)可以提交任意多個(gè),我們來(lái)看一下:
from?concurrent.futures?import?ThreadPoolExecutor
import?time
def?task(name,?n):
????time.sleep(n)
????return?f"{name}?睡了?{n}?秒"
executor?=?ThreadPoolExecutor()
futures?=?[executor.submit(task,?"古明地覺(jué)",?3),
???????????executor.submit(task,?"古明地覺(jué)",?4),
???????????executor.submit(task,?"古明地覺(jué)",?1)]
#?此時(shí)都處于running
print(futures)
"""
[<Future?at?0x1b5ff622550?state=running>,
?<Future?at?0x1b5ff63ca60?state=running>,?
?<Future?at?0x1b5ff63cdf0?state=running>]
"""
time.sleep(3)
#?主程序?sleep?3s?后
#?futures[0]和futures[2]處于?finished
#?futures[1]仍處于?running
print(futures)
"""
[<Future?at?0x1b5ff622550?state=running>,?
?<Future?at?0x1b5ff63ca60?state=running>,?
?<Future?at?0x1b5ff63cdf0?state=finished?returned?str>]
"""如果是多個(gè)函數(shù),要如何拿到返回值呢?很簡(jiǎn)單,遍歷 futures 即可。
executor?=?ThreadPoolExecutor() futures?=?[executor.submit(task,?"古明地覺(jué)",?5), ???????????executor.submit(task,?"古明地覺(jué)",?2), ???????????executor.submit(task,?"古明地覺(jué)",?4), ???????????executor.submit(task,?"古明地覺(jué)",?3), ???????????executor.submit(task,?"古明地覺(jué)",?6)] for?future?in?futures: ????print(future.result()) """ 古明地覺(jué)?睡了?5?秒 古明地覺(jué)?睡了?2?秒 古明地覺(jué)?睡了?4?秒 古明地覺(jué)?睡了?3?秒 古明地覺(jué)?睡了?6?秒 """
這里面有一些值得說(shuō)一說(shuō)的地方,首先 futures 里面有 5 個(gè) future,記做 future1, future2, future3, future4, future5。
當(dāng)使用 for 循環(huán)遍歷的時(shí)候,實(shí)際上會(huì)依次遍歷這 5 個(gè) future,所以返回值的順序就是我們添加的函數(shù)的順序。由于 future1 對(duì)應(yīng)的函數(shù)休眠了 5s,那么必須等到 5s 后,future1 里面才會(huì)有值。
但這五個(gè)函數(shù)是并發(fā)執(zhí)行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定會(huì)先執(zhí)行完畢,然后執(zhí)行 set_result,將返回值設(shè)置到對(duì)應(yīng)的 future 里。
但 Python 的 for 循環(huán)不可能在第一次迭代還沒(méi)有結(jié)束,就去執(zhí)行第二次迭代。因?yàn)?futures 里面的幾個(gè) future 的順序已經(jīng)一開(kāi)始就被定好了,只有當(dāng)?shù)谝粋€(gè) future.result() 執(zhí)行完成之后,才會(huì)執(zhí)行第二個(gè) future.result(),以及第三個(gè)、第四個(gè)。
因此即便后面的函數(shù)已經(jīng)執(zhí)行完畢,但由于 for 循環(huán)的順序,也只能等著,直到前面的 future.result() 執(zhí)行完畢。所以當(dāng)?shù)谝粋€(gè) future.result() 結(jié)束時(shí),后面三個(gè) future.result() 會(huì)立刻輸出,因?yàn)樗鼈儍?nèi)部的函數(shù)已經(jīng)執(zhí)行結(jié)束了。
而最后一個(gè) future,由于內(nèi)部函數(shù) sleep 了 6 秒,因此要再等待 1 秒,才會(huì)打印 future.result()。
使用 map 來(lái)提交多個(gè)函數(shù)
使用 submit 提交函數(shù)會(huì)返回一個(gè) future,并且還可以給 future 綁定一個(gè)回調(diào)。但如果不關(guān)心回調(diào)的話,那么還可以使用 map 進(jìn)行提交。
executor?=?ThreadPoolExecutor() #?map?內(nèi)部也是使用了?submit results?=?executor.map(task, ???????????????????????["古明地覺(jué)"]?*?3, ???????????????????????[3,?1,?2]) #?并且返回的是迭代器 print(results) """ <generator?object?...?at?0x0000022D78EFA970> """ #?此時(shí)遍歷得到的是不再是?future #?而是?future.result() for?result?in?results: ????print(result) """ 古明地覺(jué)?睡了?3?秒 古明地覺(jué)?睡了?1?秒 古明地覺(jué)?睡了?2?秒 """
可以看到,當(dāng)使用for循環(huán)的時(shí)候,map 執(zhí)行的邏輯和 submit 是一樣的。唯一的區(qū)別是,此時(shí)不需要再調(diào)用 result 了,因?yàn)榉祷氐木褪呛瘮?shù)的返回值。
或者我們直接調(diào)用 list 也行。
executor?=?ThreadPoolExecutor() results?=?executor.map(task, ???????????????????????["古明地覺(jué)"]?*?3, ???????????????????????[3,?1,?2]) print(list(results)) """ ['古明地覺(jué)?睡了?3?秒',? ?'古明地覺(jué)?睡了?1?秒',? ?'古明地覺(jué)?睡了?2?秒'] """
results 是一個(gè)生成器,調(diào)用 list 的時(shí)候會(huì)將里面的值全部產(chǎn)出。由于 map 內(nèi)部還是使用的 submit,然后通過(guò) future.result() 拿到返回值,而耗時(shí)最長(zhǎng)的函數(shù)需要 3 秒,因此這一步會(huì)阻塞 3 秒。3 秒過(guò)后,會(huì)打印所有函數(shù)的返回值。
按照順序等待執(zhí)行
上面在獲取返回值的時(shí)候,是按照函數(shù)的提交順序獲取的。如果我希望哪個(gè)函數(shù)先執(zhí)行完畢,就先獲取哪個(gè)函數(shù)的返回值,該怎么做呢?
from?concurrent.futures?import?(
????ThreadPoolExecutor,
????as_completed
)
import?time
def?task(name,?n):
????time.sleep(n)
????return?f"{name}?睡了?{n}?秒"
executor?=?ThreadPoolExecutor()
futures?=?[executor.submit(task,?"古明地覺(jué)",?5),
???????????executor.submit(task,?"古明地覺(jué)",?2),
???????????executor.submit(task,?"古明地覺(jué)",?1),
???????????executor.submit(task,?"古明地覺(jué)",?3),
???????????executor.submit(task,?"古明地覺(jué)",?4)]
for?future?in?as_completed(futures):
????print(future.result())
"""
古明地覺(jué)?睡了?1?秒
古明地覺(jué)?睡了?2?秒
古明地覺(jué)?睡了?3?秒
古明地覺(jué)?睡了?4?秒
古明地覺(jué)?睡了?5?秒
"""此時(shí)誰(shuí)先完成,誰(shuí)先返回。
取消一個(gè)函數(shù)的執(zhí)行
我們通過(guò) submit 可以將函數(shù)提交到線程池中執(zhí)行,但如果我們想取消該怎么辦呢?
executor?=?ThreadPoolExecutor() future1?=?executor.submit(task,?"古明地覺(jué)",?1) future2?=?executor.submit(task,?"古明地覺(jué)",?2) future3?=?executor.submit(task,?"古明地覺(jué)",?3) #?取消函數(shù)的執(zhí)行 #?會(huì)將?future?的?_state?屬性設(shè)置為?CANCELLED future3.cancel() #?查看是否被取消 print(future3.cancelled())??#?False
問(wèn)題來(lái)了,調(diào)用 cancelled 方法的時(shí)候,返回的是False,這是為什么?很簡(jiǎn)單,因?yàn)楹瘮?shù)已經(jīng)被提交到線程池里面了,函數(shù)已經(jīng)運(yùn)行了。而只有在還沒(méi)有運(yùn)行時(shí),取消才會(huì)成功。
可這不矛盾了嗎?函數(shù)一旦提交就會(huì)運(yùn)行,只有不運(yùn)行才會(huì)取消成功,這怎么辦?還記得線程池的一個(gè)叫做 max_workers 的參數(shù)嗎?用來(lái)控制線程池內(nèi)的線程數(shù)量,我們可以將最大的線程數(shù)設(shè)置為2,那么當(dāng)?shù)谌齻€(gè)函數(shù)進(jìn)去的時(shí)候,就不會(huì)執(zhí)行了,而是處于暫停狀態(tài)。
executor?=?ThreadPoolExecutor(max_workers=2) future1?=?executor.submit(task,?"古明地覺(jué)",?1) future2?=?executor.submit(task,?"古明地覺(jué)",?2) future3?=?executor.submit(task,?"古明地覺(jué)",?3) #?如果池子里可以創(chuàng)建空閑線程 #?那么函數(shù)一旦提交就會(huì)運(yùn)行,狀態(tài)為?RUNNING print(future1._state)??#?RUNNING print(future2._state)??#?RUNNING #?但?future3?內(nèi)部的函數(shù)還沒(méi)有運(yùn)行 #?因?yàn)槌刈永餆o(wú)法創(chuàng)建新的空閑線程了,所以狀態(tài)為?PENDING print(future3._state)??#?PENDING #?取消函數(shù)的執(zhí)行,前提是函數(shù)沒(méi)有運(yùn)行 #?會(huì)將?future?的?_state?屬性設(shè)置為?CANCELLED future3.cancel() #?查看是否被取消 print(future3.cancelled())??#?True print(future3._state)??#?CANCELLED
在啟動(dòng)線程池的時(shí)候,肯定是需要設(shè)置容量的,不然處理幾千個(gè)函數(shù)要開(kāi)啟幾千個(gè)線程嗎。另外當(dāng)函數(shù)被取消了,就不可以再調(diào)用 future.result() 了,否則的話會(huì)拋出 CancelledError。
函數(shù)執(zhí)行時(shí)出現(xiàn)異常
我們前面的邏輯都是函數(shù)正常執(zhí)行的前提下,但天有不測(cè)風(fēng)云,如果函數(shù)執(zhí)行時(shí)出現(xiàn)異常了該怎么辦?
from?concurrent.futures?import?ThreadPoolExecutor def?task1(): ????1?/?0 def?task2(): ????pass executor?=?ThreadPoolExecutor(max_workers=2) future1?=?executor.submit(task1) future2?=?executor.submit(task2) print(future1) print(future2) """ <Future?at?0x7fe3e00f9e50?state=finished?raised?ZeroDivisionError> <Future?at?0x7fe3e00f9eb0?state=finished?returned?NoneType> """ #?結(jié)果顯示?task1?函數(shù)執(zhí)行出現(xiàn)異常了 #?那么這個(gè)異常要怎么獲取呢? print(future1.exception()) print(future1.exception().__class__) """ division?by?zero <class?'ZeroDivisionError'> """ #?如果執(zhí)行沒(méi)有出現(xiàn)異常,那么?exception?方法返回?None print(future2.exception())??#?None #?注意:如果函數(shù)執(zhí)行出現(xiàn)異常了 #?那么調(diào)用?result?方法會(huì)將異常拋出來(lái) future1.result() """ Traceback?(most?recent?call?last): ??File?"...",?line?4,?in?task1 ????1?/?0 ZeroDivisionError:?division?by?zero """
出現(xiàn)異常時(shí),調(diào)用 future.set_exception 將異常設(shè)置到 future 里面,而 future 有一個(gè) _exception 屬性,專(zhuān)門(mén)保存設(shè)置的異常。當(dāng)調(diào)用 future.exception() 時(shí),也會(huì)直接返回 _exception 屬性的值。
等待所有函數(shù)執(zhí)行完畢
假設(shè)我們往線程池提交了很多個(gè)函數(shù),如果希望提交的函數(shù)都執(zhí)行完畢之后,主程序才能往下執(zhí)行,該怎么辦呢?其實(shí)方案有很多:
第一種:
from?concurrent.futures?import?ThreadPoolExecutor
import?time
def?task(n):
????time.sleep(n)
????return?f"sleep?{n}"
executor?=?ThreadPoolExecutor()
future1?=?executor.submit(task,?5)
future2?=?executor.submit(task,?2)
future3?=?executor.submit(task,?4)
#?這里是不會(huì)阻塞的
print("start")
#?遍歷所有的?future,并調(diào)用其?result?方法
#?這樣就會(huì)等到所有的函數(shù)都執(zhí)行完畢之后才會(huì)往下走
for?future?in?[future1,?future2,?future3]:
????print(future.result())
print("end")
"""
start
sleep?5
sleep?2
sleep?4
end
"""第二種:
from?concurrent.futures?import?(
????ThreadPoolExecutor,
????wait
)
import?time
def?task(n):
????time.sleep(n)
????return?f"sleep?{n}"
executor?=?ThreadPoolExecutor()
future1?=?executor.submit(task,?5)
future2?=?executor.submit(task,?2)
future3?=?executor.submit(task,?4)
#?return_when?有三個(gè)可選參數(shù)
# FIRST_COMPLETED:當(dāng)任意一個(gè)任務(wù)完成或者取消
# FIRST_EXCEPTION:當(dāng)任意一個(gè)任務(wù)出現(xiàn)異常
#??????????????????如果都沒(méi)出現(xiàn)異常等同于ALL_COMPLETED
# ALL_COMPLETED:所有任務(wù)都完成,默認(rèn)是這個(gè)值
fs?=?wait([future1,?future2,?future3],
??????????return_when="ALL_COMPLETED")
#?此時(shí)返回的fs是DoneAndNotDoneFutures類(lèi)型的namedtuple
#?里面有兩個(gè)值,一個(gè)是done,一個(gè)是not_done
print(fs.done)
"""
{<Future?at?0x1df1400?state=finished?returned?str>,?
?<Future?at?0x2f08e48?state=finished?returned?str>,?
?<Future?at?0x9f7bf60?state=finished?returned?str>}
"""
print(fs.not_done)
"""
set()
"""
for?f?in?fs.done:
????print(f.result())
"""
start
sleep?5
sleep?2
sleep?4
end
"""第三種:
#?使用上下文管理 with?ThreadPoolExecutor()?as?executor: ????future1?=?executor.submit(task,?5) ????future2?=?executor.submit(task,?2) ????future3?=?executor.submit(task,?4) #?所有函數(shù)執(zhí)行完畢(with語(yǔ)句結(jié)束)后才會(huì)往下執(zhí)行
第四種:
executor?=?ThreadPoolExecutor() future1?=?executor.submit(task,?5) future2?=?executor.submit(task,?2) future3?=?executor.submit(task,?4) #?所有函數(shù)執(zhí)行結(jié)束后,才會(huì)往下執(zhí)行 executor.shutdown()
小結(jié)
如果我們需要啟動(dòng)多線程來(lái)執(zhí)行函數(shù)的話,那么不妨使用線程池。每調(diào)用一個(gè)函數(shù)就從池子里面取出一個(gè)線程,函數(shù)執(zhí)行完畢就將線程放回到池子里以便其它函數(shù)執(zhí)行。如果池子里面空了,或者說(shuō)無(wú)法創(chuàng)建新的空閑線程,那么接下來(lái)的函數(shù)就只能處于等待狀態(tài)了。
最后,concurrent.futures 不僅可以用于實(shí)現(xiàn)線程池,還可以用于實(shí)現(xiàn)進(jìn)程池。兩者的 API 是一樣的:
from?concurrent.futures?import?ProcessPoolExecutor
import?time
def?task(n):
????time.sleep(n)
????return?f"sleep?{n}"
executor?=?ProcessPoolExecutor()
#?Windows?上需要加上這一行
if?__name__?==?'__main__':
????future1?=?executor.submit(task,?5)
????future2?=?executor.submit(task,?2)
????future3?=?executor.submit(task,?4)
????executor.shutdown()
????print(future1.result())
????print(future2.result())
????print(future3.result())
"""
sleep?5
sleep?2
sleep?4
"""????線程池和進(jìn)程池的 API 是一致的,但工作中很少會(huì)創(chuàng)建進(jìn)程池。
以上就是Python快速實(shí)現(xiàn)一個(gè)線程池的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Python線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
瘋狂上漲的Python 開(kāi)發(fā)者應(yīng)從2.x還是3.x著手?
熱度瘋漲的 Python,開(kāi)發(fā)者應(yīng)從 2.x 還是 3.x 著手?這篇文章就為大家分析一下了Python開(kāi)發(fā)者應(yīng)從2.x還是3.x學(xué)起,感興趣的小伙伴們可以參考一下2017-11-11
利用python計(jì)算時(shí)間差(返回天數(shù))
這篇文章主要給大家介紹了關(guān)于如何利用python計(jì)算時(shí)間差(返回天數(shù))的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Python編程產(chǎn)生非均勻隨機(jī)數(shù)的幾種方法代碼分享
這篇文章主要介紹了Python編程產(chǎn)生非均勻隨機(jī)數(shù)的幾種方法代碼分享,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12
pytorch 實(shí)現(xiàn)模型不同層設(shè)置不同的學(xué)習(xí)率方式
今天小編就為大家分享一篇pytorch 實(shí)現(xiàn)模型不同層設(shè)置不同的學(xué)習(xí)率方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-01-01
python?time模塊計(jì)算時(shí)間之間的差距(練習(xí)題)
這篇文章主要介紹了python?time模塊計(jì)算時(shí)間之間的差距,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05

