Python實現(xiàn)線程池之線程安全隊列
本文實例為大家分享了Python實現(xiàn)線程池之線程安全隊列的具體代碼,供大家參考,具體內(nèi)容如下
一、線程池組成
一個完整的線程池由下面幾部分組成,線程安全隊列、任務(wù)對象、線程處理對象、線程池對象。其中一個線程安全的隊列是實現(xiàn)線程池和任務(wù)隊列的基礎(chǔ),本節(jié)我們通過threading包中的互斥量threading.Lock()和條件變量threading.Condition()來實現(xiàn)一個簡單的、讀取安全的線程隊列。

二、線程安全隊列的實現(xiàn)
包括put、pop、get等方法,為保證線程安全,讀寫操作時要添加互斥鎖;并且pop操作可以設(shè)置等待時間以阻塞當(dāng)前獲取元素的線程,當(dāng)新元素寫入隊列時通過條件變量通知解除等待操作。
class ThreadSafeQueue(object): ? ? def __init__(self, max_size=0): ? ? ? ? self.queue = [] ? ? ? ? self.max_size = max_size ?# max_size為0表示無限大 ? ? ? ? self.lock = threading.Lock() ?# 互斥量 ? ? ? ? self.condition = threading.Condition() ?# 條件變量 ? ? def size(self): ? ? ? ? """ ? ? ? ? 獲取當(dāng)前隊列的大小 ? ? ? ? :return: 隊列長度 ? ? ? ? """ ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? size = len(self.queue) ? ? ? ? self.lock.release() ? ? ? ? return size ? ? def put(self, item): ? ? ? ? """ ? ? ? ? 將單個元素放入隊列 ? ? ? ? :param item: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? # 隊列已滿 max_size為0表示無限大 ? ? ? ? if self.max_size != 0 and self.size() >= self.max_size: ? ? ? ? ? ? return ThreadSafeException() ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? self.queue.append(item) ? ? ? ? self.lock.release() ? ? ? ? self.condition.acquire() ? ? ? ? # 通知等待讀取的線程 ? ? ? ? self.condition.notify() ? ? ? ? self.condition.release() ? ? ? ? return item ? ? def batch_put(self, item_list): ? ? ? ? """ ? ? ? ? 批量添加元素 ? ? ? ? :param item_list: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if not isinstance(item_list, list): ? ? ? ? ? ? item_list = list(item_list) ? ? ? ? res = [self.put(item) for item in item_list] ? ? ? ? return res ? ? def pop(self, block=False, timeout=0): ? ? ? ? """ ? ? ? ? 從隊列頭部取出元素 ? ? ? ? :param block: 是否阻塞線程 ? ? ? ? :param timeout: 等待時間 ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0: ? ? ? ? ? ? if block: ? ? ? ? ? ? ? ? self.condition.acquire() ? ? ? ? ? ? ? ? self.condition.wait(timeout) ? ? ? ? ? ? ? ? self.condition.release() ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = None ? ? ? ? if len(self.queue): ? ? ? ? ? ? item = self.queue.pop() ? ? ? ? self.lock.release() ? ? ? ? return item ? ? def get(self, index): ? ? ? ? """ ? ? ? ? 獲取指定位置的元素 ? ? ? ? :param index: ? ? ? ? :return: ? ? ? ? """ ? ? ? ? if self.size() == 0 or index >= self.size(): ? ? ? ? ? ? return None ? ? ? ? # 加鎖 ? ? ? ? self.lock.acquire() ? ? ? ? item = self.queue[index] ? ? ? ? self.lock.release() ? ? ? ? return item class ThreadSafeException(Exception): ? ? pass
三、測試邏輯
3.1、測試阻塞邏輯
def thread_queue_test_1():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer():
? ? ? ? while True:
? ? ? ? ? ? print('current time before pop is %d' % time.time())
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=3)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('get value from queue is %s' % item)
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(item)
? ? ? ? ? ? print('current time after pop is %d' % time.time())
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer)
? ? t1.start()
? ? t2.start()
? ? t1.join()
? ? t2.join()測試結(jié)果:
我們可以看到生產(chǎn)者線程每隔2s向隊列寫入一個元素,消費者線程當(dāng)無數(shù)據(jù)時默認(rèn)阻塞3s。通過執(zhí)行時間發(fā)現(xiàn)消費者線程確實發(fā)生了阻塞,當(dāng)生產(chǎn)者寫入數(shù)據(jù)時結(jié)束當(dāng)前等待操作。

3.2、測試讀寫加鎖邏輯
def thread_queue_test_2():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer(name):
? ? ? ? while True:
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=1)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('%s get value from queue is %s' % (name, item))
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print('%s get value from queue is None' % name)
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer, args=('thread1',))
? ? t3 = threading.Thread(target=consumer, args=('thread2',))
? ? t1.start()
? ? t2.start()
? ? t3.start()
? ? t1.join()
? ? t2.join()
? ? t3.join()測試結(jié)果:
生產(chǎn)者還是每2s生成一個元素寫入隊列,消費者開啟兩個線程進(jìn)行消費,默認(rèn)阻塞時間為1s,打印結(jié)果顯示通過加鎖確保每次只有一個線程能獲取數(shù)據(jù),保證了線程讀寫的安全。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python搭建Keras CNN模型破解網(wǎng)站驗證碼的實現(xiàn)
這篇文章主要介紹了Python搭建Keras CNN模型破解網(wǎng)站驗證碼的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04
Python學(xué)習(xí)之字符串函數(shù)使用詳解
Python的友好在于提供了非常好強(qiáng)大的功能函數(shù)模塊,對于字符串的使用,同樣提供許多簡單便捷的字符串函數(shù)。Python 字符串自帶了很多有用的函數(shù),快來跟隨小編學(xué)習(xí)一下這些函數(shù)的應(yīng)用詳解吧2021-12-12
django數(shù)據(jù)模型on_delete, db_constraint的使用詳解
這篇文章主要介紹了django數(shù)據(jù)模型on_delete, db_constraint的使用詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
tensorflow: variable的值與variable.read_value()的值區(qū)別詳解
今天小編就為大家分享一篇tensorflow: variable的值與variable.read_value()的值區(qū)別詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07
NetWorkX使用方法及nx.draw()相關(guān)參數(shù)解讀
這篇文章主要介紹了NetWorkX使用方法及nx.draw()相關(guān)參數(shù)解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12
pytorch方法測試——激活函數(shù)(ReLU)詳解
今天小編就為大家分享一篇pytorch方法測試——激活函數(shù)(ReLU)詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01

