理解生產(chǎn)者消費(fèi)者模型及在Python編程中的運(yùn)用實(shí)例
什么是生產(chǎn)者消費(fèi)者模型
在 工作中,大家可能會(huì)碰到這樣一種情況:某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來負(fù)責(zé)處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進(jìn)程等)。產(chǎn) 生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費(fèi)者。在生產(chǎn)者與消費(fèi)者之間在加個(gè)緩沖區(qū),我們形象的稱之為倉庫,生產(chǎn)者負(fù)責(zé)往倉庫了進(jìn)商 品,而消費(fèi)者負(fù)責(zé)從倉庫里拿商品,這就構(gòu)成了生產(chǎn)者消費(fèi)者模型。結(jié)構(gòu)圖如下:

生產(chǎn)者消費(fèi)者模型的優(yōu)點(diǎn):
1、解耦
假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個(gè)類。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,那么生產(chǎn)者對于消費(fèi)者就會(huì)產(chǎn)生依賴(也就是耦合)。將來如果消費(fèi)者的代碼發(fā)生變化, 可能會(huì)影響到生產(chǎn)者。而如果兩者都依賴于某個(gè)緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。
舉個(gè)例子,我們?nèi)ム]局投遞信件,如果不使用郵筒(也就是緩沖區(qū)),你必須得把信直接交給郵遞員。有同學(xué)會(huì)說,直接給郵遞員不是挺簡單的嘛?其實(shí)不簡單,你必須 得認(rèn)識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產(chǎn)生和你和郵遞員之間的依賴(相當(dāng)于生產(chǎn)者和消費(fèi)者的強(qiáng)耦合)。萬一哪天郵遞員換人了,你還要重新認(rèn)識一下(相當(dāng)于消費(fèi)者變化導(dǎo)致修改生產(chǎn)者代碼)。而郵筒相對來說比較固定,你依賴它的成本就比較低(相當(dāng)于和緩沖區(qū)之間的弱耦合)。
2、支持并發(fā)
由于生產(chǎn)者與消費(fèi)者是兩個(gè)獨(dú)立的并發(fā)體,他們之間是用緩沖區(qū)作為橋梁連接,生產(chǎn)者只需要往緩沖區(qū)里丟數(shù)據(jù),就可以繼續(xù)生產(chǎn)下一個(gè)數(shù)據(jù),而消費(fèi)者只需要從緩沖區(qū)了拿數(shù)據(jù)即可,這樣就不會(huì)因?yàn)楸舜说奶幚硭俣榷l(fā)生阻塞。
接上面的例子,如果我們不使用郵筒,我們就得在郵局等郵遞員,直到他回來,我們把信件交給他,這期間我們啥事兒都不能干(也就是生產(chǎn)者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(相當(dāng)于消費(fèi)者輪詢)。
3、支持忙閑不均
緩沖區(qū)還有另一個(gè)好處。如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。 等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。
為了充分復(fù)用,我們再拿寄信的例子來說事。假設(shè)郵遞員一次只能帶走1000封信。萬一某次碰上情人節(jié)(也可能是圣誕節(jié))送賀卡,需要寄出去的信超過1000封,這時(shí) 候郵筒這個(gè)緩沖區(qū)就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時(shí)再拿走。
Python示例:
利用隊(duì)列實(shí)現(xiàn)簡單的生產(chǎn)者消費(fèi)者模型,生產(chǎn)者產(chǎn)生時(shí)間放入隊(duì)列,消費(fèi)者取出時(shí)間打印
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
msg = self._queue.get()
if isinstance(msg, str) and msg == 'quit':
break
print "I'm a thread, and I received %s!!" % msg
print 'Bye byes!'
def producer():
queue = Queue.Queue()
worker = Consumer(queue)
worker.start() # 開啟消費(fèi)者線程
start_time = time.time()
while time.time() - start_time < 5:
queue.put('something at %s' % time.time())
time.sleep(1)
queue.put('quit')
worker.join()
if __name__ == '__main__':
producer()
使用多線程,在做爬蟲的時(shí)候,生產(chǎn)者用著產(chǎn)生url鏈接,消費(fèi)者用于獲取url數(shù)據(jù),在隊(duì)列的幫助下可以使用多線程加快爬蟲速度。
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
print content
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
'http://211.103.242.133:8080/Disease/Details.aspx?id=2258'
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
for url in urls:
queue.put(url)
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
相關(guān)文章
numpy數(shù)據(jù)類型dtype轉(zhuǎn)換實(shí)現(xiàn)
這篇文章主要介紹了numpy數(shù)據(jù)類型dtype轉(zhuǎn)換實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
python 實(shí)現(xiàn)任務(wù)管理清單案例
一文教你如何用Python輕輕松松操作Excel,Word,CSV
Pycharm無法顯示動(dòng)態(tài)圖片的解決方法

