python多進(jìn)程下的生產(chǎn)者和消費(fèi)者模型
一、生產(chǎn)者消費(fèi)者模型介紹
1.1 為什么需要使用生產(chǎn)者消費(fèi)者模型
生產(chǎn)者是指生產(chǎn)數(shù)據(jù)的任務(wù),消費(fèi)者是指消費(fèi)數(shù)據(jù)的任務(wù)。當(dāng)生產(chǎn)者的生產(chǎn)能力遠(yuǎn)大于消費(fèi)者的消費(fèi)能力,生產(chǎn)者就需要等消費(fèi)者消費(fèi)完才能繼續(xù)生產(chǎn)新的數(shù)據(jù),同理,如果消費(fèi)者的消費(fèi)能力遠(yuǎn)大于生產(chǎn)者的生產(chǎn)能力,消費(fèi)者就需要等生產(chǎn)者生產(chǎn)完數(shù)據(jù)才能繼續(xù)消費(fèi),這種等待會(huì)造成效率的低下,為了解決這種問題就引入了生產(chǎn)者消費(fèi)者模型。
1.2 如何實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
進(jìn)程間引入隊(duì)列可以實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型,通過使用隊(duì)列無需考慮鎖的概念,因?yàn)檫M(jìn)程間的通信是通過隊(duì)列來實(shí)現(xiàn)的;
生產(chǎn)者生產(chǎn)的數(shù)據(jù)往隊(duì)列里面寫,消費(fèi)者消費(fèi)數(shù)據(jù)直接從隊(duì)列里面取,這樣就對(duì)實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的解耦。
生產(chǎn)者 -- > 隊(duì)列 <--消費(fèi)者
二、Queue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
2.1 消費(fèi)者生產(chǎn)者模型代碼
from multiprocessing import Process, Queue
import time
# 消費(fèi)者方法
def consumer(q, name):
while True:
res = q.get()
# if res is None: break
print("%s 吃了 %s" % (name, res))
# 生產(chǎn)者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模擬生產(chǎn)西瓜的時(shí)間延遲
res = "%s %s" % (food, i)
print("%s 生產(chǎn)了 %s" % (name, res))
# 把生產(chǎn)的vegetable放入到隊(duì)列中
q.put(res)
if __name__ == "__main__":
#創(chuàng)建隊(duì)列
q = Queue()
# 創(chuàng)建生產(chǎn)者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
c1 = Process(target=consumer, args=(q, "peter",))
p1.start()
c1.start()
# p1.join()
# q.put(None)
print("主進(jìn)程")
2.2 執(zhí)行結(jié)果
2.2.1 直接執(zhí)行上面的代碼的結(jié)果
直接執(zhí)行會(huì)出現(xiàn)一個(gè)問題就是生產(chǎn)者生產(chǎn)完了,沒有向消費(fèi)者發(fā)送一個(gè)停止的信號(hào),所以消費(fèi)者一直會(huì)一直阻塞在q.get(),導(dǎo)致程序無法退出。

為了解決上面的問題,讓消費(fèi)者消費(fèi)完了生產(chǎn)者的數(shù)據(jù)之后自動(dòng)退出,就需要在生產(chǎn)者進(jìn)程介紹的時(shí)候往隊(duì)列里面put一個(gè)結(jié)束信號(hào),消費(fèi)者拿到這個(gè)信號(hào),就退出消費(fèi)進(jìn)程。
主要是兩個(gè)地方修改 ,把下方代碼的注釋打開就可以實(shí)現(xiàn)消費(fèi)者消費(fèi)完接收到生產(chǎn)者的結(jié)束信號(hào)就退出消費(fèi)者進(jìn)程了。
def consumer(): if res is None: break if __name__ == "__main__": p1.join() q.put(None)
2.2.2 把注釋打開后的運(yùn)行結(jié)果
把注釋打開后,消費(fèi)者拿到了生產(chǎn)者發(fā)送的結(jié)束信號(hào),可以正常退出程序了。

但如果有n個(gè)消費(fèi)者,就需要發(fā)送n個(gè)結(jié)束信號(hào),這種方式就不是那么簡(jiǎn)潔,像下面的代碼這樣:
from multiprocessing import Process, Queue
import time
# 消費(fèi)者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
# 生產(chǎn)者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模擬生產(chǎn)西瓜的時(shí)間延遲
res = "%s %s" % (food, i)
print("%s 生產(chǎn)了 %s" % (name, res))
# 把生產(chǎn)的vegetable放入到隊(duì)列中
q.put(res)
if __name__ == "__main__":
# 創(chuàng)建隊(duì)列
q = Queue()
# 創(chuàng)建生產(chǎn)者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "香蕉"))
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
q.put(None)
print("主進(jìn)程")
其實(shí)我們現(xiàn)在就是生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后想往隊(duì)列里面發(fā)送一個(gè)結(jié)束信號(hào),python語言提供了另外一種隊(duì)列JoinableQueue([maxsize])來解決這種問題
三、JoinableQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
3.1 JoinableQueue方法介紹
JoinableQueue([maxsize]) : A queue type which also supports join() and task_done() methods
q.task_done():消費(fèi)者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。
q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理;阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。
3.2 JoinableQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型源碼
from multiprocessing import Process,JoinableQueue
import time
# 消費(fèi)者方法
def consumer(q, name):
while True:
res = q.get()
if res is None: break
print("%s 吃了 %s" % (name, res))
q.task_done() # 發(fā)送信號(hào)給q.join(),表示已經(jīng)從隊(duì)列中取走一個(gè)值并處理完畢了
# 生產(chǎn)者方法
def producer(q, name, food):
for i in range(3):
time.sleep(1) # 模擬生產(chǎn)西瓜的時(shí)間延遲
res = "%s %s" % (food, i)
print("%s 生產(chǎn)了 %s" % (name, res))
# 把生產(chǎn)的vegetable放入到隊(duì)列中
q.put(res)
q.join() # 等消費(fèi)者把自己放入隊(duì)列的所有元素取完之后才結(jié)束
if __name__ == "__main__":
# q = Queue()
q = JoinableQueue()
# 創(chuàng)建生產(chǎn)者
p1 = Process(target=producer, args=(q, "kelly", "西瓜"))
p2 = Process(target=producer, args=(q, "kelly2", "藍(lán)莓"))
# 創(chuàng)建消費(fèi)者
c1 = Process(target=consumer, args=(q, "peter",))
c2 = Process(target=consumer, args=(q, "peter2",))
c3 = Process(target=consumer, args=(q, "peter3",))
c1.daemon = True
c2.daemon = True
c3.daemon = True
p_l = [p1, p2, c1, c2, c3]
for p in p_l:
p.start()
p1.join()
p2.join()
# 1.主進(jìn)程等待p1,p2進(jìn)程結(jié)束才繼續(xù)執(zhí)行
# 2.由于q.join()的存在,生產(chǎn)者只有等隊(duì)列中的元素被消費(fèi)完才會(huì)結(jié)束
# 3.生產(chǎn)者結(jié)束了,就代表消費(fèi)者已經(jīng)消費(fèi)完了,也可以結(jié)束了,所以可以把消費(fèi)者設(shè)置為守護(hù)進(jìn)程(隨著主進(jìn)程的退出而退出)
print("主進(jìn)程")
3.3 運(yùn)行結(jié)果
通過運(yùn)行結(jié)果可以看出,生產(chǎn)者沒有手動(dòng)發(fā)送結(jié)束信號(hào)給消費(fèi)者,而是通過JoinableQueue隊(duì)列的方式也實(shí)現(xiàn)了生產(chǎn)者消費(fèi)者模型。

到此這篇關(guān)于python多進(jìn)程下的生產(chǎn)者和消費(fèi)者模型的文章就介紹到這了,更多相關(guān)python多進(jìn)程下的生產(chǎn)者和消費(fèi)者內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python自動(dòng)發(fā)微信監(jiān)控報(bào)警
這篇文章主要為大家詳細(xì)介紹了python自動(dòng)發(fā)微信監(jiān)控報(bào)警,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-09-09
Python爬蟲實(shí)戰(zhàn)之爬取京東商品數(shù)據(jù)并實(shí)實(shí)現(xiàn)數(shù)據(jù)可視化
今天再帶大家簡(jiǎn)單爬一波京東的商品數(shù)據(jù)唄,廢話不多說,文中有非常詳細(xì)的代碼示例,需要的朋友可以參考下2021-06-06
Python數(shù)據(jù)結(jié)構(gòu)集合的相關(guān)詳解
集合是Python中一種無序且元素唯一的數(shù)據(jù)結(jié)構(gòu),主要用于存儲(chǔ)不重復(fù)的元素,Python提供set類型表示集合,可通過{}或set()創(chuàng)建,集合元素不可重復(fù)且無序,不支持索引訪問,但可迭代,集合可變,支持添加、刪除元素,集合操作包括并集、交集、差集等,可通過運(yùn)算符或方法執(zhí)行2024-09-09
python爬取網(wǎng)頁內(nèi)容轉(zhuǎn)換為PDF文件
這篇文章主要為大家詳細(xì)介紹了python爬取網(wǎng)頁內(nèi)容轉(zhuǎn)換為PDF文件,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06
python3+PyQt5+Qt Designer實(shí)現(xiàn)擴(kuò)展對(duì)話框
這篇文章主要為大家詳細(xì)介紹了python3+PyQt5+Qt Designer實(shí)現(xiàn)擴(kuò)展對(duì)話框,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04
淺談對(duì)Python變量的一些認(rèn)識(shí)理解
變量(variable)是編程的基礎(chǔ)概念,Python 的變量看似簡(jiǎn)單,深入了解卻不易.文中有非常詳細(xì)的介紹及代碼示例,對(duì)正在學(xué)習(xí)python的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05
python 自動(dòng)監(jiān)控最新郵件并讀取的操作
這篇文章主要介紹了python 自動(dòng)監(jiān)控最新郵件并讀取的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-03-03

