Python自定義線程池實(shí)現(xiàn)方法分析
本文實(shí)例講述了Python自定義線程池實(shí)現(xiàn)方法。分享給大家供大家參考,具體如下:
關(guān)于python的多線程,由與GIL的存在被廣大群主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務(wù)效率還是可以杠杠的。
我實(shí)現(xiàn)的這個(gè)線程池其實(shí)是根據(jù)銀角的思路來實(shí)現(xiàn)的。
主要思路:
任務(wù)獲取和執(zhí)行:
1、任務(wù)加入隊(duì)列,等待線程來獲取并執(zhí)行。
2、按需生成線程,每個(gè)線程循環(huán)取任務(wù)。
線程銷毀:
1、獲取任務(wù)是終止符時(shí),線程停止。
2、線程池close()時(shí),向任務(wù)隊(duì)列加入和已生成線程等量的終止符。
3、線程池terminate()時(shí),設(shè)置線程下次任務(wù)取到為終止符。
流程概要設(shè)計(jì):

詳細(xì)代碼:
import threading
import contextlib
from Queue import Queue
import time
class ThreadPool(object):
def __init__(self, max_num):
self.StopEvent = 0#線程任務(wù)終止符,當(dāng)線程從隊(duì)列獲取到StopEvent時(shí),代表此線程可以銷毀??稍O(shè)置為任意與任務(wù)有區(qū)別的值。
self.q = Queue()
self.max_num = max_num #最大線程數(shù)
self.terminal = False #是否設(shè)置線程池強(qiáng)制終止
self.created_list = [] #已創(chuàng)建線程的線程列表
self.free_list = [] #空閑線程的線程列表
self.Deamon=False #線程是否是后臺(tái)線程
def run(self, func, args, callback=None):
"""
線程池執(zhí)行一個(gè)任務(wù)
:param func: 任務(wù)函數(shù)
:param args: 任務(wù)函數(shù)所需參數(shù)
:param callback:
:return: 如果線程池已經(jīng)終止,則返回True否則None
"""
if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
self.create_thread()
task = (func, args, callback,)
self.q.put(task)
def create_thread(self):
"""
創(chuàng)建一個(gè)線程
"""
t = threading.Thread(target=self.call)
t.setDaemon(self.Deamon)
t.start()
self.created_list.append(t)#將當(dāng)前線程加入已創(chuàng)建線程列表created_list
def call(self):
"""
循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
"""
current_thread = threading.current_thread() #獲取當(dāng)前線程對象·
event = self.q.get() #從任務(wù)隊(duì)列獲取任務(wù)
while event != self.StopEvent: #判斷獲取到的任務(wù)是否是終止符
func, arguments, callback = event#從任務(wù)中獲取函數(shù)名、參數(shù)、和回調(diào)函數(shù)名
try:
result = func(*arguments)
func_excute_status =True#func執(zhí)行成功狀態(tài)
except Exception as e:
func_excute_status = False
result =None
print '函數(shù)執(zhí)行產(chǎn)生錯(cuò)誤', e#打印錯(cuò)誤信息
if func_excute_status:#func執(zhí)行成功后才能執(zhí)行回調(diào)函數(shù)
if callback is not None:#判斷回調(diào)函數(shù)是否是空的
try:
callback(result)
except Exception as e:
print '回調(diào)函數(shù)執(zhí)行產(chǎn)生錯(cuò)誤', e # 打印錯(cuò)誤信息
with self.worker_state(self.free_list,current_thread):
#執(zhí)行完一次任務(wù)后,將線程加入空閑列表。然后繼續(xù)去取任務(wù),如果取到任務(wù)就將線程從空閑列表移除
if self.terminal:#判斷線程池終止命令,如果需要終止,則使下次取到的任務(wù)為StopEvent。
event = self.StopEvent
else: #否則繼續(xù)獲取任務(wù)
event = self.q.get() # 當(dāng)線程等待任務(wù)時(shí),q.get()方法阻塞住線程,使其持續(xù)等待
else:#若線程取到的任務(wù)是終止符,就銷毀線程
#將當(dāng)前線程從已創(chuàng)建線程列表created_list移除
self.created_list.remove(current_thread)
def close(self):
"""
執(zhí)行完所有的任務(wù)后,所有線程停止
"""
full_size = len(self.created_list)#按已創(chuàng)建的線程數(shù)量往線程隊(duì)列加入終止符。
while full_size:
self.q.put(self.StopEvent)
full_size -= 1
def terminate(self):
"""
無論是否還有任務(wù),終止線程
"""
self.terminal = True
while self.created_list:
self.q.put(self.StopEvent)
self.q.queue.clear()#清空任務(wù)隊(duì)列
def join(self):
"""
阻塞線程池上下文,使所有線程執(zhí)行完后才能繼續(xù)
"""
for t in self.created_list:
t.join()
@contextlib.contextmanager#上下文處理器,使其可以使用with語句修飾
def worker_state(self, state_list, worker_thread):
"""
用于記錄線程中正在等待的線程數(shù)
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
if __name__ == '__main__':
def Foo(arg):
return arg
# time.sleep(0.1)
def Bar(res):
print res
pool=ThreadPool(5)
# pool.Deamon=True#需在pool.run之前設(shè)置
for i in range(1000):
pool.run(func=Foo,args=(i,),callback=Bar)
pool.close()
pool.join()
# pool.terminate()
print "任務(wù)隊(duì)列里任務(wù)數(shù)%s" %pool.q.qsize()
print "當(dāng)前存活子線程數(shù)量:%d" % threading.activeCount()
print "當(dāng)前線程創(chuàng)建列表:%s" %pool.created_list
print "當(dāng)前線程創(chuàng)建列表:%s" %pool.free_list
關(guān)于上下文處理:
來個(gè)簡單例子說明:
下面的代碼手動(dòng)自定義了一個(gè)myopen方法,模擬我們常見的with open() as f:語句。具體的contextlib模塊使用,會(huì)單獨(dú)開章來將。
# coding:utf-8
import contextlib
@contextlib.contextmanager#定義該函數(shù)支持上下文with語句
def myopen(filename,mode):
f=open(filename,mode)
try:
yield f.readlines()#正常執(zhí)行返回f.readlines()
except Exception as e:
print e
finally:
f.close()#最后在with代碼快執(zhí)行完畢后返回執(zhí)行finally下的f.close()實(shí)現(xiàn)關(guān)閉文件
if __name__ == '__main__':
with myopen(r'c:\ip1.txt','r') as f:
for line in f:
print line
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設(shè)計(jì)有所幫助。
相關(guān)文章
淺析Python中壓縮zipfile與解壓縮tarfile模塊的使用
Python?提供了兩個(gè)標(biāo)準(zhǔn)庫模塊來處理文件的壓縮和解壓縮操作:zipfile和tarfile,本文將分享?這兩個(gè)模塊的使用方法,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-10-10
tensorflow入門:tfrecord 和tf.data.TFRecordDataset的使用
今天小編就為大家分享一篇tensorflow入門:tfrecord 和tf.data.TFRecordDataset的使用,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01
Pytorch框架構(gòu)建ResNet模型的實(shí)現(xiàn)示例
本文主要介紹了Pytorch框架構(gòu)建ResNet模型的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06
pycharm中TensorFlow調(diào)試常見問題小結(jié)
本文主要介紹了在pycharm下調(diào)用tensorflow庫時(shí)會(huì)出現(xiàn)的問題,在本文做個(gè)小結(jié),也給自己留個(gè)筆記,感興趣的可以了解一下2021-06-06
python網(wǎng)絡(luò)爬蟲selenium打開多窗口與切換頁面的實(shí)現(xiàn)
本文主要介紹了python網(wǎng)絡(luò)爬蟲selenium打開多窗口與切換頁面的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
python3.6.3轉(zhuǎn)化為win-exe文件發(fā)布的方法
今天小編就為大家分享一篇python3.6.3轉(zhuǎn)化為win-exe文件發(fā)布的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-10-10
python淺析守護(hù)線程與非守護(hù)線程的區(qū)別與使用
守護(hù)線程,又稱后臺(tái)線程,它是在后臺(tái)運(yùn)行的,如果所有前臺(tái)線程都死亡,那么后臺(tái)線程就會(huì)自動(dòng)死亡,本章我們來了解守護(hù)線程與非守護(hù)線程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-08-08

