Python全棧之隊列詳解
更新時間:2021年12月24日 17:21:29 作者:熬夜泡枸杞
這篇文章主要為大家介紹了Python全棧之隊列,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
1. lock互斥鎖
知識點:
lock.acquire()# 上鎖 lock.release()# 解鎖 #同一時間允許一個進程上一把鎖 就是Lock 加鎖可以保證多個進程修改同一塊數(shù)據(jù)時,同一時間只能有一個任務(wù)可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲速度卻保證了數(shù)據(jù)安全。 #同一時間允許多個進程上多把鎖 就是[信號量Semaphore] 信號量是鎖的變形: 實際實現(xiàn)是 計數(shù)器 + 鎖,同時允許多個進程上鎖 # 互斥鎖Lock : 互斥鎖就是進程的互相排斥,誰先搶到資源,誰就上鎖改資源內(nèi)容,為了保證數(shù)據(jù)的同步性 # 注意:多個鎖一起上,不開鎖,會造成死鎖.上鎖和解鎖是一對.
程序?qū)崿F(xiàn):
# ### 鎖 lock 互斥鎖
from multiprocessing import Process,Lock
""" 上鎖和解鎖是一對, 連續(xù)上鎖不解鎖是死鎖 ,只有在解鎖的狀態(tài)下,其他進程才有機會上鎖 """
"""
# 創(chuàng)建一把鎖
lock = Lock()
# 上鎖
lock.acquire()
# lock.acquire() # 連續(xù)上鎖,造成了死鎖現(xiàn)象;
print("我在裊裊炊煙 .. 你在焦急等待 ... 廁所進行時 ... ")
# 解鎖
lock.release()
"""
# ### 12306 搶票軟件
import json,time,random
# 1.讀寫數(shù)據(jù)庫當(dāng)中的票數(shù)
def wr_info(sign , dic=None):
if sign == "r":
with open("ticket",mode="r",encoding="utf-8") as fp:
dic = json.load(fp)
return dic
elif sign == "w":
with open("ticket",mode="w",encoding="utf-8") as fp:
json.dump(dic,fp)
# dic = wr_info("w",dic={"count":0})
# print(dic , type(dic) )
# 2.執(zhí)行搶票的方法
def get_ticket(person):
# 先獲取數(shù)據(jù)庫中實際票數(shù)
dic = wr_info("r")
# 模擬一下網(wǎng)絡(luò)延遲
time.sleep(random.uniform(0.1,0.7))
# 判斷票數(shù)
if dic["count"] > 0:
print("{}搶到票了".format(person))
# 搶到票后,讓當(dāng)前票數(shù)減1
dic["count"] -= 1
# 更新數(shù)據(jù)庫中的票數(shù)
wr_info("w",dic)
else:
print("{}沒有搶到票哦".format(person))
# 3.對搶票和讀寫票數(shù)做一個統(tǒng)一的調(diào)用
def main(person,lock):
# 查看剩余票數(shù)
dic = wr_info("r")
print("{}查看票數(shù)剩余: {}".format(person,dic["count"]))
# 上鎖
lock.acquire()
# 開始搶票
get_ticket(person)
# 解鎖
lock.release()
if __name__ == "__main__":
lock = Lock()
lst = ["梁新宇","康裕康","張保張","于朝志","薛宇健","韓瑞瑞","假摔先","劉子濤","黎明輝","趙鳳勇"]
for i in lst:
p = Process( target=main,args=( i , lock ) )
p.start()
"""
創(chuàng)建進程,開始搶票是異步并發(fā)程序
直到開始搶票的時候,變成同步程序,
先搶到鎖資源的先執(zhí)行,后搶到鎖資源的后執(zhí)行;
按照順序依次執(zhí)行;是同步程序;
搶票的時候,變成同步程序,好處是可以等到數(shù)據(jù)修改完成之后,在讓下一個人搶,保證數(shù)據(jù)不亂。
如果不上鎖的話,只剩一張票的時候,那么所有的人都能搶到票,因為程序執(zhí)行的速度太快,所以接近同步進程,導(dǎo)致數(shù)據(jù)也不對。
"""
ticket文件
{"count": 0}

2. 事件_紅綠燈效果
2.1 信號量_semaphore
# ### 信號量 Semaphore 本質(zhì)上就是鎖,只不過是多個進程上多把鎖,可以控制上鎖的數(shù)量
"""Semaphore = lock + 數(shù)量 """
from multiprocessing import Semaphore , Process
import time , random
"""
# 同一時間允許多個進程上5把鎖
sem = Semaphore(5)
#上鎖
sem.acquire()
print("執(zhí)行操作 ... ")
#解鎖
sem.release()
"""
def singsong_ktv(person,sem):
# 上鎖
sem.acquire()
print("{}進入了唱吧ktv , 正在唱歌 ~".format(person))
# 唱一段時間
time.sleep( random.randrange(4,8) ) # 4 5 6 7
print("{}離開了唱吧ktv , 唱完了 ... ".format(person))
# 解鎖
sem.release()
if __name__ == "__main__":
sem = Semaphore(5)
lst = ["趙鳳勇" , "沈思雨", "趙萬里" , "張宇" , "假率先" , "孫杰龍" , "陳璐" , "王雨涵" , "楊元濤" , "劉一鳳" ]
for i in lst:
p = Process(target=singsong_ktv , args = (i , sem) )
p.start()
"""
# 總結(jié): Semaphore 可以設(shè)置上鎖的數(shù)量 , 同一時間上多把鎖
創(chuàng)建進程時,是異步并發(fā),執(zhí)行任務(wù)時,是同步程序;
"""
# 趙萬里進入了唱吧ktv , 正在唱歌 ~
# 趙鳳勇進入了唱吧ktv , 正在唱歌 ~
# 張宇進入了唱吧ktv , 正在唱歌 ~
# 沈思雨進入了唱吧ktv , 正在唱歌 ~
# 孫杰龍進入了唱吧ktv , 正在唱歌 ~
2.2 事件_紅綠燈效果
# ### 事件 (Event)
"""
# 阻塞事件 :
e = Event()生成事件對象e
e.wait()動態(tài)給程序加阻塞 , 程序當(dāng)中是否加阻塞完全取決于該對象中的is_set() [默認返回值是False]
# 如果是True 不加阻塞
# 如果是False 加阻塞
# 控制這個屬性的值
# set()方法 將這個屬性的值改成True
# clear()方法 將這個屬性的值改成False
# is_set()方法 判斷當(dāng)前的屬性是否為True (默認上來是False)
"""
from multiprocessing import Process,Event
import time , random
# 1
'''
e = Event()
# 默認屬性值是False.
print(e.is_set())
# 判斷內(nèi)部成員屬性是否是False
e.wait()
# 如果是False , 代碼程序阻塞
print(" 代碼執(zhí)行中 ... ")
'''
# 2
'''
e = Event()
# 將這個屬性的值改成True
e.set()
# 判斷內(nèi)部成員屬性是否是True
e.wait()
# 如果是True , 代碼程序不阻塞
print(" 代碼執(zhí)行中 ... ")
# 將這個屬性的值改成False
e.clear()
e.wait()
print(" 代碼執(zhí)行中 .... 2")
'''
# 3
"""
e = Event()
# wait(3) 代表最多等待3秒;
e.wait(3)
print(" 代碼執(zhí)行中 .... 3")
"""
# ### 模擬經(jīng)典紅綠燈效果
# 紅綠燈切換
def traffic_light(e):
print("紅燈亮")
while True:
if e.is_set():
# 綠燈狀態(tài) -> 切紅燈
time.sleep(1)
print("紅燈亮")
# True => False
e.clear()
else:
# 紅燈狀態(tài) -> 切綠燈
time.sleep(1)
print("綠燈亮")
# False => True
e.set()
# e = Event()
# traffic_light(e)
# 車的狀態(tài)
def car(e,i):
# 判斷是否是紅燈,如果是加上wait阻塞
if not e.is_set():
print("car{} 在等待 ... ".format(i))
e.wait()
# 否則不是,代表綠燈通行;
print("car{} 通行了 ... ".format(i))
"""
# 1.全國紅綠燈
if __name__ == "__main__":
e = Event()
# 創(chuàng)建交通燈
p1 = Process(target=traffic_light , args=(e,))
p1.start()
# 創(chuàng)建小車進程
for i in range(1,21):
time.sleep(random.randrange(2))
p2 = Process(target=car , args=(e,i))
p2.start()
"""
# 2.包頭紅綠燈,沒有車的時候,把紅綠燈關(guān)了,省電;
if __name__ == "__main__":
lst = []
e = Event()
# 創(chuàng)建交通燈
p1 = Process(target=traffic_light , args=(e,))
# 設(shè)置紅綠燈為守護進程
p1.daemon = True
p1.start()
# 創(chuàng)建小車進程
for i in range(1,21):
time.sleep(random.randrange(2))
p2 = Process(target=car , args=(e,i))
lst.append(p2)
p2.start()
# 讓所有的小車全部跑完,把紅綠燈炸飛
print(lst)
for i in lst:
i.join()
print("關(guān)閉成功 .... ")
事件知識點:
# 阻塞事件 :
e = Event()生成事件對象e
e.wait()動態(tài)給程序加阻塞 , 程序當(dāng)中是否加阻塞完全取決于該對象中的is_set() [默認返回值是False]
# 如果是True 不加阻塞
# 如果是False 加阻塞
# 控制這個屬性的值
# set()方法 將這個屬性的值改成True
# clear()方法 將這個屬性的值改成False
# is_set()方法 判斷當(dāng)前的屬性是否為True (默認上來是False)
3. queue進程隊列
# ### 進程隊列(進程與子進程是相互隔離的,如果兩者想要進行通信,可以利用隊列實現(xiàn))
from multiprocessing import Process,Queue
# 引入線程模塊; 為了捕捉queue.Empty異常;
import queue
# 1.基本語法
"""順序: 先進先出,后進后出"""
# 創(chuàng)建進程隊列
q = Queue()
# put() 存放
q.put(1)
q.put(2)
q.put(3)
# get() 獲取
"""在獲取不到任何數(shù)據(jù)時,會出現(xiàn)阻塞"""
# print( q.get() )
# print( q.get() )
# print( q.get() )
# print( q.get() )
# get_nowait() 拿不到數(shù)據(jù)報異常
"""[windows]效果正常 [linux]不兼容"""
try:
print( q.get_nowait() )
print( q.get_nowait() )
print( q.get_nowait() )
print( q.get_nowait() )
except : #queue.Empty
pass
# put_nowait() 非阻塞版本的put
# 設(shè)置當(dāng)前隊列最大長度為3 ( 元素個數(shù)最多是3個 )
"""在指定隊列長度的情況下,如果塞入過多的數(shù)據(jù),會導(dǎo)致阻塞"""
# q2 = Queue(3)
# q2.put(111)
# q2.put(222)
# q2.put(333)
# q2.put(444)
"""使用put_nowait 在隊列已滿的情況下,塞入數(shù)據(jù)會直接報錯"""
q2 = Queue(3)
try:
q2.put_nowait(111)
q2.put_nowait(222)
q2.put_nowait(333)
q2.put_nowait(444)
except:
pass
# 2.進程間的通信IPC
def func(q):
# 2.子進程獲取主進程存放的數(shù)據(jù)
res = q.get()
print(res,"<22>")
# 3.子進程中存放數(shù)據(jù)
q.put("劉一縫")
if __name__ == "__main__":
q3 = Queue()
p = Process(target=func,args=(q3,))
p.start()
# 1.主進程存入數(shù)據(jù)
q3.put("趙鳳勇")
# 為了等待子進程把數(shù)據(jù)存放隊列后,主進程在獲取數(shù)據(jù);
p.join()
# 4.主進程獲取子進程存放的數(shù)據(jù)
print(q3.get() , "<33>")
小提示: 一般主進程比子進程執(zhí)行的快一些

隊列知識點:
# 進程間通信 IPC
# IPC Inter-Process Communication
# 實現(xiàn)進程之間通信的兩種機制:
# 管道 Pipe
# 隊列 Queue
# put() 存放
# get() 獲取
# get_nowait() 拿不到報異常
# put_nowait() 非阻塞版本的put
q.empty() 檢測是否為空 (了解)
q.full() 檢測是否已經(jīng)存滿 (了解)
4. 生產(chǎn)者消費者模型
# ### 生產(chǎn)者和消費者模型
"""
# 爬蟲案例
1號進程負責(zé)抓取其他多個網(wǎng)站中相關(guān)的關(guān)鍵字信息,正則匹配到隊列中存儲(mysql)
2號進程負責(zé)把隊列中的內(nèi)容拿取出來,將經(jīng)過修飾后的內(nèi)容布局到自個的網(wǎng)站中
1號進程可以理解成生產(chǎn)者
2號進程可以理解成消費者
從程序上來看
生產(chǎn)者負責(zé)存儲數(shù)據(jù) (put)
消費者負責(zé)獲取數(shù)據(jù) (get)
生產(chǎn)者和消費者比較理想的模型:
生產(chǎn)多少,消費多少 . 生產(chǎn)數(shù)據(jù)的速度 和 消費數(shù)據(jù)的速度 相對一致
"""
# 1.基礎(chǔ)版生產(chǎn)著消費者模型
"""問題 : 當(dāng)前模型,程序不能正常終止 """
"""
from multiprocessing import Process,Queue
import time,random
# 消費者模型
def consumer(q,name):
while True:
# 獲取隊列中的數(shù)據(jù)
food = q.get()
time.sleep(random.uniform(0.1,1))
print("{}吃了{}".format(name,food))
# 生產(chǎn)者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
# 展示生產(chǎn)的數(shù)據(jù)
print( "{}生產(chǎn)了{}".format( name , food+str(i) ) )
# 存儲生產(chǎn)的數(shù)據(jù)在隊列中
q.put(food+str(i))
if __name__ == "__main__":
q = Queue()
p1 = Process( target=consumer,args=(q , "趙萬里") )
p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) )
p1.start()
p2.start()
p2.join()
"""
# 2.優(yōu)化模型
"""特點 : 手動在隊列的最后,加入標(biāo)識None, 終止消費者模型"""
"""
from multiprocessing import Process,Queue
import time,random
# 消費者模型
def consumer(q,name):
while True:
# 獲取隊列中的數(shù)據(jù)
food = q.get()
# 如果最后一次獲取的數(shù)據(jù)是None , 代表隊列已經(jīng)沒有更多數(shù)據(jù)可以獲取了,終止循環(huán);
if food is None:
break
time.sleep(random.uniform(0.1,1))
print("{}吃了{}".format(name,food))
# 生產(chǎn)者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
# 展示生產(chǎn)的數(shù)據(jù)
print( "{}生產(chǎn)了{}".format( name , food+str(i) ) )
# 存儲生產(chǎn)的數(shù)據(jù)在隊列中
q.put(food+str(i))
if __name__ == "__main__":
q = Queue()
p1 = Process( target=consumer,args=(q , "趙萬里") )
p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) )
p1.start()
p2.start()
p2.join()
q.put(None) # 香蕉0 香蕉1 香蕉2 香蕉3 香蕉4 None
"""
# 3.多個生產(chǎn)者和消費者
""" 問題 : 雖然可以解決問題 , 但是需要加入多個None , 代碼冗余"""
from multiprocessing import Process,Queue
import time,random
# 消費者模型
def consumer(q,name):
while True:
# 獲取隊列中的數(shù)據(jù)
food = q.get()
# 如果最后一次獲取的數(shù)據(jù)是None , 代表隊列已經(jīng)沒有更多數(shù)據(jù)可以獲取了,終止循環(huán);
if food is None:
break
time.sleep(random.uniform(0.1,1))
print("{}吃了{}".format(name,food))
# 生產(chǎn)者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
# 展示生產(chǎn)的數(shù)據(jù)
print( "{}生產(chǎn)了{}".format( name , food+str(i) ) )
# 存儲生產(chǎn)的數(shù)據(jù)在隊列中
q.put(food+str(i))
if __name__ == "__main__":
q = Queue()
p1 = Process( target=consumer,args=(q , "趙萬里") )
p1_1 = Process( target=consumer,args=(q , "趙世超") )
p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) )
p2_2 = Process( target=producer,args=(q , "趙鳳勇" , "大蒜" ) )
p1.start()
p1_1.start()
p2.start()
p2_2.start()
# 等待所有數(shù)據(jù)填充完畢
p2.join()
p2_2.join()
# 把None 關(guān)鍵字放在整個隊列的最后,作為跳出消費者循環(huán)的標(biāo)識符;
q.put(None) # 給第一個消費者加一個None , 用來終止
q.put(None) # 給第二個消費者加一個None , 用來終止
# ...

5. joinablequeue隊列使用
# ### JoinableQueue 隊列
"""
put 存放
get 獲取
task_done 計算器屬性值-1
join 配合task_done來使用 , 阻塞
put 一次數(shù)據(jù), 隊列的內(nèi)置計數(shù)器屬性值+1
get 一次數(shù)據(jù), 通過task_done讓隊列的內(nèi)置計數(shù)器屬性值-1
join: 會根據(jù)隊列計數(shù)器的屬性值來判斷是否阻塞或者放行
隊列計數(shù)器屬性是 等于 0 , 代碼不阻塞放行
隊列計數(shù)器屬性是 不等 0 , 意味著代碼阻塞
"""
from multiprocessing import JoinableQueue
jq = JoinableQueue()
jq.put("王同培") # +1
jq.put("王偉") # +2
print(jq.get())
print(jq.get())
# print(jq.get()) 阻塞
jq.task_done() # -1
jq.task_done() # -1
jq.join()
print(" 代碼執(zhí)行結(jié)束 .... ")
# ### 2.使用JoinableQueue 改造生產(chǎn)著消費者模型
from multiprocessing import Process,Queue
import time,random
# 消費者模型
def consumer(q,name):
while True:
# 獲取隊列中的數(shù)據(jù)
food = q.get()
time.sleep(random.uniform(0.1,1))
print("{}吃了{}".format(name,food))
# 讓隊列的內(nèi)置計數(shù)器屬性-1
q.task_done()
# 生產(chǎn)者模型
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
# 展示生產(chǎn)的數(shù)據(jù)
print( "{}生產(chǎn)了{}".format( name , food+str(i) ) )
# 存儲生產(chǎn)的數(shù)據(jù)在隊列中
q.put(food+str(i))
if __name__ == "__main__":
q = JoinableQueue()
p1 = Process( target=consumer,args=(q , "趙萬里") )
p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) )
p1.daemon = True
p1.start()
p2.start()
p2.join()
# 必須等待隊列中的所有數(shù)據(jù)全部消費完畢,再放行
q.join()
print("程序結(jié)束 ... ")
6. 總結(jié)
ipc可以讓進程之間進行通信 lock其實也讓進程之間進行通信了,多個進程去搶一把鎖,一個進程搶到 這 把鎖了,其他的進程就搶不到這把鎖了,進程通過socket底層互相發(fā) 消息,告訴其他進程當(dāng)前狀態(tài)已經(jīng)被鎖定了,不能再強了。 進程之間默認是隔離的,不能通信的,如果想要通信,必須通過ipc的 方式(lock、joinablequeue、Manager)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
python機器學(xué)習(xí)實戰(zhàn)之樹回歸詳解
這篇文章主要為大家詳細介紹了python機器學(xué)習(xí)實戰(zhàn)之樹回歸的相關(guān)代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12
Python實現(xiàn)統(tǒng)計圖像連通域的示例詳解
這篇文章主要為大家詳細介紹了如何利用Python實現(xiàn)統(tǒng)計圖像連通域的功能,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起了解一下2023-04-04
Python實現(xiàn)多并發(fā)訪問網(wǎng)站功能示例
這篇文章主要介紹了Python實現(xiàn)多并發(fā)訪問網(wǎng)站功能,結(jié)合具體實例形式分析了Python線程結(jié)合URL模塊并發(fā)訪問網(wǎng)站的相關(guān)操作技巧,需要的朋友可以參考下2017-06-06

