Python多線程與同步機(jī)制淺析
線程實(shí)現(xiàn)
Python中線程有兩種方式:函數(shù)或者用類來包裝線程對象。threading模塊中包含了豐富的多線程支持功能:
- threading.currentThread(): 返回當(dāng)前線程;
- threading.enumerate(): 返回包含正在運(yùn)行的線程列表;
- threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量,與len(threading.enumerate())等價。
Thread類
通過Thread類來處理線程,類中提供的一些方法:
- run(): 用以表示線程執(zhí)行的方法(可重載實(shí)現(xiàn)實(shí)際功能);
- start(): 啟動線程;
- join([time]): 等待線程中止(或者超時);
- isAlive(): 返回線程是否活動;
- getName(): 返回線程名;
- setName(): 設(shè)置線程名;
- setDaemon(True):設(shè)置為后臺進(jìn)程(必須在start調(diào)用前設(shè)定)。
函數(shù)方式
通過Thread直接構(gòu)造線程,然后通過start方法啟動線程:
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
各參數(shù)說明:
- group:指定線程隸屬的線程組(當(dāng)前忽略);
- target:指定線程要調(diào)度的目標(biāo)方法(即實(shí)現(xiàn)功能的函數(shù));
- args:傳遞給目標(biāo)方法的參數(shù)(以元組的方式);
- kwargs:傳遞給目標(biāo)方法的參數(shù)(以字典的方式);
- daemon:指定線程是否為后臺線程。
def simpleRoutine(name, delay):
print(f"routine {name} starting...")
time.sleep(delay)
print(f"routine {name} finished")
if __name__ == '__main__':
thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
thrOne.start()
thrTwo.start()
thrOne.join()
thrTwo.join()
繼承方式
直接繼承Thread,創(chuàng)建一個新的子類(主要實(shí)現(xiàn)run方法):
class SimpleThread (threading.Thread):
def __init__(self, name, delay):
# threading.Thread.__init__(self)
super().__init__()
self.name = name
self.delay = delay
def run(self):
print(f"thread {self.name} starting...")
time.sleep(self.delay)
print(f"thread {self.name} finished")
if __name__ == '__main__':
thrOne = SimpleThread("First", 2)
thrTwo = SimpleThread("Second", 2)
thrOne.start()
thrTwo.start()
thrOne.join()
thrTwo.join()
同步機(jī)制
當(dāng)多個線程同時修改同一條數(shù)據(jù)時可能會出現(xiàn)臟數(shù)據(jù);所以,就需要線程鎖,即同一時刻只允許一個線程執(zhí)行操作。
同步鎖Lock
threading提供了Lock和RLock(可重入鎖)兩個類,它們都提供了如下兩個方法來加鎖和釋放鎖:
- acquire(blocking=True, timeout=-1):加鎖,其中 timeout 參數(shù)指定加鎖多少秒。
- release():釋放鎖。
兩種使用鎖的方式:
gCount = 0
def PlusOne(locker):
global gCount
with locker:
gCount += 1、
def MinusOne(locker):
global gCount
if locker.acquire():
gCount -= 1
locker.release()
條件變量Condition
Condition對象內(nèi)部維護(hù)了一個鎖(構(gòu)造時可傳遞一個Lock/RLock對象,否則內(nèi)部會自行創(chuàng)建一個RLock)和一個waiting池:
- 通過acquire獲得Condition對象;
- 當(dāng)調(diào)用wait方法時,線程會釋放Condition內(nèi)部的鎖并進(jìn)入blocked狀態(tài),同時在waiting池中記錄這個線程;
- 當(dāng)調(diào)用notify方法時,Condition對象會從waiting池中挑選一個線程,通知其調(diào)用acquire方法嘗試取到鎖。
Condition對象:
__init__(self,lock=None):Condition類總是與一個鎖相關(guān)聯(lián)(若不指定lock參數(shù),會自動創(chuàng)建一個與之綁定的RLock對象);
acquire(timeout):調(diào)用關(guān)聯(lián)鎖的acquire()方法;
release():調(diào)用關(guān)聯(lián)鎖的release()方法
wait(timeout):線程掛起,直到收到一個notify通知或超時才會被喚醒;必須在已獲得鎖的前提下調(diào)用;
notify(n=1):喚醒waiting池中的n個正在等待的線程并通知它:
- 收到通知的線程將自動調(diào)用acquire()方法嘗試加鎖;
- 若waiting池中有多個線程,隨機(jī)選擇n個喚醒;
- 必須在已獲得鎖的前提下調(diào)用,否則將引發(fā)錯誤。
notify_all():通知所有線程。
class Producer(threading.Thread):
def __init__(self, cond, storage):
threading.Thread.__init__(self)
self.cond = cond
self.storage = storage
def run(self):
label = 1
while True:
with self.cond:
if len(self.storage) < 10:
self.storage.append(label)
print(f"<- Produce {label} product")
label += 1
self.cond.notify(2)
else:
print(f"<- storage full: Has Produced {label - 1} product")
self.cond.notify_all()
self.cond.wait()
time.sleep(0.4)
class Consumer(threading.Thread):
def __init__(self, name, cond, storage):
threading.Thread.__init__(self)
self.name = name
self.cond = cond
self.storage = storage
def run(self):
while True:
if self.cond.acquire():
if len(self.storage) > 1:
pro = self.storage.pop(0)
print(f"-> {self.name} consumed {pro}")
self.cond.notify()
else:
print(f"-> {self.name} storage empty: no product to consume")
self.cond.wait()
self.cond.release()
time.sleep(1)信號量Semaphore
信號量對象內(nèi)部維護(hù)一個計(jì)數(shù)器:
acquire(blocking=True,timeout=None)時減1,當(dāng)計(jì)數(shù)為0就阻塞請求的線程;release()時加1,當(dāng)計(jì)數(shù)大于0恢復(fù)被阻塞的線程;
threading中有Semaphore和BoundedSemaphore兩個信號量;BoundedSemaphore限制了release的次數(shù),任何時候計(jì)數(shù)器的值,都不不能大于初始值(release時會檢測計(jì)數(shù)器的值,若大于等于初始值,則拋出ValueError異常)。
通過Semaphore維護(hù)生產(chǎn)(release一個)、消費(fèi)(acquire一個)量:
# products = threading.Semaphore(0)
def produceOne(label, sem: threading.Semaphore):
sem.release()
print(f"{label} produce one")
def consumeOne(label, sem: threading.Semaphore):
sem.acquire()
print(f"{label} consume one")
通過BoundedSemaphore來控制并發(fā)數(shù)量(最多有Semaphore初始值數(shù)量的線程并發(fā)):
# runner = threading.BoundedSemaphore(3)
def runBound(name, sem: threading.BoundedSemaphore):
with sem:
print(f"{name} is running")
time.sleep(1)
print(f"{name} finished")
事件Event
事件對象內(nèi)部有個標(biāo)志字段,用于線程等待事件的發(fā)生:
- isSet():返回event的狀態(tài)值;
- wait():狀態(tài)為False時,一直阻塞;否則立即返回;
- set(): 設(shè)置狀態(tài)值為True,激活所有被阻塞的線程;
- clear():恢復(fù)狀態(tài)值為False。
多線程等待事件發(fā)生,然后開始執(zhí)行:
def waiters(name, evt: threading.Event):
evt.wait()
print(f"{name} is running")
time.sleep(1)
print(f"{name} finished")
def starting(evt: threading.Event):
evt.set()
print("event is set")
屏障Barrier
屏障用于設(shè)定等待線程數(shù)量,當(dāng)數(shù)量達(dá)到指定值時,開始執(zhí)行:
threading.Barrier(parties, action=None, timeout=None)
屏障屬性與方法:
- wait(timeout=None):等待通過屏障;線程被阻塞,直到阻塞的數(shù)量達(dá)到parties時,被阻塞的線程被同時全部釋放;
- reset():重置屏障到默認(rèn)的空狀態(tài);
- abort():將障礙置為斷開狀態(tài);導(dǎo)致等待的線程引發(fā)BrokenBarrierError異常;
- partier():通過障礙所需的線程數(shù);
- n_waiting():當(dāng)前在屏障中等待的線程數(shù);
- broken():如果屏障處于斷開狀態(tài),則返回True。
def waitBarrier(name, barr: threading.Barrier):
print(f"{name} waiting for open")
try:
barr.wait()
print(f"{name} running")
time.sleep(5)
except threading.BrokenBarrierError:
print(f"{name} exception")
print(f"{name} finished")
GIL全局解釋器鎖
GIL(Global Interpreter Lock,全局解釋器鎖);cpython中,某個線程想要執(zhí)行,必須先拿到GIL(可以把GIL看作是“通行證”)。每次釋放GIL鎖,線程都要進(jìn)行鎖競爭,切換線程,會消耗資源。
由于GIL鎖的存在,python里一個進(jìn)程永遠(yuǎn)只能同時執(zhí)行一個線程(拿到GIL的線程),這就是為什么在多核CPU上,python的多線程效率并不高:
- CPU密集型代碼:由于計(jì)算工作多,會很快用完時間片,然后觸發(fā)GIL的釋放與再競爭;
- IO密集型代碼(文件處理、網(wǎng)絡(luò)爬蟲等):多線程能夠有效提升效率(單線程下有IO操作會進(jìn)行IO等待,造成不必要的時間浪費(fèi),而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費(fèi)CPU的資源,從而能提升程序執(zhí)行效率)。
python在使用多線程的時候,調(diào)用的是c語言的原生線程:
- 拿到公共數(shù)據(jù)
- 申請GIL
- python解釋器調(diào)用os原生線程
- os操作cpu執(zhí)行運(yùn)算
- 當(dāng)線程執(zhí)行時間到后,就進(jìn)行切換(context switch)
到此這篇關(guān)于Python多線程與同步機(jī)制淺析的文章就介紹到這了,更多相關(guān)Python多線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python調(diào)用C++,通過Pybind11制作Python接口
今天小編就為大家分享一篇關(guān)于Python調(diào)用C++,通過Pybind11制作Python接口,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10
pytorch 更改預(yù)訓(xùn)練模型網(wǎng)絡(luò)結(jié)構(gòu)的方法
今天小編就為大家分享一篇pytorch 更改預(yù)訓(xùn)練模型網(wǎng)絡(luò)結(jié)構(gòu)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
python中的循環(huán)結(jié)構(gòu)問題
這篇文章主要介紹了python中的循環(huán)結(jié)構(gòu)問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03
我們知道python只定義了6種數(shù)據(jù)類型,字符串,整數(shù),浮點(diǎn)數(shù),列表,元組,字典。但是C語言中有些字節(jié)型的變量,在python中該如何實(shí)現(xiàn)呢?這點(diǎn)頗為重要,特別是要在網(wǎng)絡(luò)上進(jìn)行數(shù)據(jù)傳輸?shù)脑挕?/div> 2014-06-06
python 裝飾器功能以及函數(shù)參數(shù)使用介紹
之前學(xué)習(xí)編程語言大多也就是學(xué)的很淺很淺,基本上也是很少涉及到裝飾器這些的類似的內(nèi)容??偸怯X得是一樣很神奇的東西,舍不得學(xué)(嘿嘿)。今天看了一下書籍。發(fā)現(xiàn)道理還是很簡單的2012-01-01
python 爬蟲網(wǎng)頁登陸的簡單實(shí)現(xiàn)
這篇文章主要介紹了python 爬蟲網(wǎng)頁登陸的簡單實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
python 如何對Series中的每一個數(shù)據(jù)做運(yùn)算
這篇文章主要介紹了python 實(shí)現(xiàn)對Series中的每一個數(shù)據(jù)做運(yùn)算操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-05-05最新評論

