詳解python多線程之間的同步(一)
引言:
線程之間經(jīng)常需要協(xié)同工作,通過某種技術(shù),讓一個(gè)線程訪問某些數(shù)據(jù)時(shí),其它線程不能訪問這些數(shù)據(jù),直到該線程完成對數(shù)據(jù)的操作。這些技術(shù)包括臨界區(qū)(Critical Section),互斥量(Mutex),信號量(Semaphore),事件Event等。
Event
threading庫中的event對象通過使用內(nèi)部一個(gè)flag標(biāo)記,通過flag的True或者False的變化來進(jìn)行操作。
| 名稱 | 含義 |
| set( ) | 標(biāo)記設(shè)置為True |
| clear( ) | 標(biāo)記設(shè)置為False |
| is_set( ) | 標(biāo)記是否為True |
| wait(timeout=None) | 設(shè)置等待標(biāo)記為True的時(shí)長,None為無限等待。等到返回True,等不到返回False |
from threading import Thread,Event
import time
def creditor(event:Event):
print("什么時(shí)候還我錢")
event.wait()
print("我已經(jīng)等了很長時(shí)間了")
def debtor(event:Event,count=10):
print("可以寬裕幾天嗎?")
money=[]
while True:
print("先還你100")
time.sleep(0.5)
money.append(1)
if len(money)>count:
event.set()
break
print("我已經(jīng)還完你的錢了")
event=Event()
c=Thread(target=creditor,args=(event,))
d=Thread(target=debtor,args=(event,))
c.start()
d.start()
運(yùn)行結(jié)果如下所示:

可以看到creditor函數(shù)中因?yàn)閑vent.wait( )線程進(jìn)入等待狀態(tài),此時(shí)debtor線程進(jìn)入運(yùn)行,當(dāng)滿足條件時(shí)event.set( )將標(biāo)記設(shè)置為True,creditor線程開始運(yùn)行。誰wait就是等到flag變?yōu)門rue,或等到超時(shí)變?yōu)镕alse。不限制等待的個(gè)數(shù)。
wait的使用
from threading import Event,Thread
def Wait(event:Event,interval):
while not event.wait(interval):
print("waiting for you")
e=Event()
Thread(target=Wait,args=(e,3)).start()
e.wait(10)
e.set()
print("main exit")

主線程一開始就wait 10s,Waiting線程等待3s返回False,進(jìn)入循環(huán)打印"waiting for you",重復(fù)3次,然后主線程set了,這時(shí)候Waiting線程變?yōu)門rue,不再進(jìn)入循環(huán)。
Lock
凡是存在資源爭用的地方都可以使用鎖,從而保證只有一個(gè)使用者可以完全使用這個(gè)資源
現(xiàn)在要生產(chǎn)10個(gè)杯子,由10個(gè)工人開始生產(chǎn)
import threading
import time
cups=[]
def worker(count=10):
print("我是{},我開始生產(chǎn)了".format(threading.current_thread().name))
flag=False
while True:
if len(cups)>count:
flag=True
time.sleep(0.05)
if not flag:
cups.append(1)
if flag:
break
print("finished.cups={}".format(len(cups)))
for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()
運(yùn)行結(jié)果如下圖所示:

我們明明只需要到1000就會break,但是結(jié)果卻到了1010個(gè),這就是因?yàn)橛?0個(gè)線程,其中每個(gè)線程都在增加,但是增加后的數(shù)目,其他線程并不會知道(每個(gè)線程通過len函數(shù)拿到數(shù)量,但是剛拿到數(shù)字,其他線程就立即更新了)
這個(gè)時(shí)候我們就需要鎖lock來實(shí)現(xiàn)了,一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞
| 名稱 | 含義 |
| acquire(blocking=True,timeout=-1) | 默認(rèn)阻塞,阻塞可以設(shè)置超時(shí)時(shí)間。非阻塞時(shí),timeout禁止設(shè)置。成功獲取鎖,返回True,否則返回False |
| release( ) | 釋放鎖??梢詮娜魏尉€程釋放。已上鎖的鎖,會拋出RuntimeError異常 |
加鎖的實(shí)現(xiàn):
import threading
import time
cups=[]
lock=threading.Lock()
def worker(count=10):
print("我是{},我開始生產(chǎn)了".format(threading.current_thread().name))
flag=False
while True:
lock.acquire()
if len(cups)>=count:
flag=True
time.sleep(0.005)
if not flag:
cups.append(1)
lock.release()
if flag:
break
print("finished,cups={}".format(len(cups)))
for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()
運(yùn)行結(jié)果如圖所示:

一般來說加鎖后還需要一些代碼實(shí)現(xiàn),在釋放鎖之前還有可能拋出異常,一旦出現(xiàn)異常,鎖無法釋放,但是當(dāng)前這個(gè)線程會因?yàn)檫@個(gè)異常而終止,這樣會產(chǎn)生死鎖,因此使用時(shí)要使用如下的方法:
1,使用try...finally語句保證鎖的釋放
2,with安全上下文管理(鎖對象支持上下文管理)
計(jì)數(shù)器類,用來加,減。
import threading import time class Counter: def __init__(self): self._val = 0 self.__lock = threading.Lock() @property def value(self): return self._val def inc(self): try: self.__lock.acquire() self._val += 1 finally: self.__lock.release() def dec(self): with self.__lock: self._val -= 1 def run(c: Counter, count=100): for _ in range(count): for i in range(-50, 50): if i < 0: c.dec() else: c.inc() c = Counter() c1 = 10 c2 = 1000 for i in range(c1): threading.Thread(target=run, args=(c, c2)).start() while True: if threading.active_count() == 1: print(c.value) break
啟動(dòng)了10個(gè)線程,1000次從-50到50進(jìn)行加減,最后得到0,如果沒有加鎖處理的話,得到的結(jié)果未必是自己想要的。
鎖的使用場景:
鎖適用于訪問和修改同一個(gè)資源的時(shí)候,引起資源爭用的情況下。使用鎖的注意事項(xiàng):
- 1,少用鎖,除非有必要。多線程訪問加鎖的資源時(shí),由于鎖的存在,實(shí)際就變成了串行。
- 2,加鎖時(shí)間越短越好,不需要就立即釋放鎖。
- 3,一定要避免死鎖,使用with或者try...finally。
非阻塞鎖使用
import threading
import time
def worker(tasks):
for task in tasks:
time.sleep(0.001)
if task.lock.acquire(False):
print("{} {} begin to start".format(threading.current_thread(),task.name))
else:
print("{} {} is working".format(threading.current_thread(),task.name))
class Task:
def __init__(self,name):
self.name=name
self.lock=threading.Lock()
tasks=[Task('task-{}'.format(x)) for x in range(10)]
for i in range(5):
threading.Thread(target=worker,name="worker-{}".format(i),args=(tasks,)).start()
運(yùn)行結(jié)果如下圖所示:

總共開啟了5個(gè)線程,每個(gè)線程處理10個(gè)任務(wù),因?yàn)樵趇f語句里面,task.lock.acquire(False),所以每個(gè)線程只有拿到鎖是True,其他的線程不會阻塞會返回False。打印"is working"。
以上所述是小編給大家介紹的python多線程之間的同步詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
TensorFlow tf.nn.softmax_cross_entropy_with_logits的用法
這篇文章主要介紹了TensorFlow tf.nn.softmax_cross_entropy_with_logits的用法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04
Python 基于 pygame 實(shí)現(xiàn)輪播圖動(dòng)畫效果
在Python中可以適應(yīng)第三方庫pygame來實(shí)現(xiàn)輪播圖動(dòng)畫的效果,使用pygame前需確保其已經(jīng)安裝,本文通過實(shí)例代碼介紹Python 基于 pygame 實(shí)現(xiàn)輪播圖動(dòng)畫效果,感興趣的朋友跟隨小編一起看看吧2024-03-03
用Python監(jiān)控你的朋友都在瀏覽哪些網(wǎng)站?
今天教各位小伙伴一個(gè)黑科技,用Python監(jiān)控你的朋友都在瀏覽哪些網(wǎng)站,文中有非常詳細(xì)的介紹,對正在學(xué)習(xí)python的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05
python 根據(jù)excel中顏色區(qū)分讀取的操作
這篇文章主要介紹了python 根據(jù)excel中顏色區(qū)分讀取的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03
使用celery執(zhí)行Django串行異步任務(wù)的方法步驟
這篇文章主要介紹了使用celery執(zhí)行Django串行異步任務(wù),文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Django具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
python實(shí)現(xiàn)密碼強(qiáng)度校驗(yàn)
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)密碼強(qiáng)度校驗(yàn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-03-03
Python基礎(chǔ)之函數(shù)用法實(shí)例詳解
這篇文章主要介紹了Python中函數(shù)用法,包括了函數(shù)的創(chuàng)建、定義、參數(shù)等,需要的朋友可以參考下2014-09-09

