Python3標(biāo)準(zhǔn)庫(kù)之threading進(jìn)程中管理并發(fā)操作方法
1. threading進(jìn)程中管理并發(fā)操作
threading模塊提供了管理多個(gè)線程執(zhí)行的API,允許程序在同一個(gè)進(jìn)程空間并發(fā)的運(yùn)行多個(gè)操作。
1.1 Thread對(duì)象
要使用Thread,最簡(jiǎn)單的方法就是用一個(gè)目標(biāo)函數(shù)實(shí)例化一個(gè)Thread對(duì)象,并調(diào)用start()讓它開(kāi)始工作。
import threading
def worker():
"""thread worker function"""
print('Worker')
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
輸出有5行,每一行都是"Worker"。

如果能夠創(chuàng)建一個(gè)線程,并向它傳遞參數(shù)告訴它要完成什么工作,那么這會(huì)很有用。任何類(lèi)型的對(duì)象都可以作為參數(shù)傳遞到線程。下面的例子傳遞了一個(gè)數(shù),線程將打印出這個(gè)數(shù)。
import threading
def worker(num):
"""thread worker function"""
print('Worker: %s' % num)
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
現(xiàn)在這個(gè)整數(shù)參數(shù)會(huì)包含在各線程打印的消息中。

1.2 確定當(dāng)前線程
使用參數(shù)來(lái)標(biāo)識(shí)或命名線程很麻煩,也沒(méi)有必要。每個(gè)Thread實(shí)例都有一個(gè)帶有默認(rèn)值的名,該默認(rèn)值可以在創(chuàng)建線程時(shí)改變。如果服務(wù)器進(jìn)程中有多個(gè)服務(wù)線程處理不同的操作,那么在這樣的服務(wù)器進(jìn)程中,對(duì)線程命名就很有用。
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
調(diào)試輸出的每一行中包含有當(dāng)前線程的名。線程名列中有"Thread-1"的行對(duì)應(yīng)未命名的線程w2。

大多數(shù)程序并不使用print來(lái)進(jìn)行調(diào)試。logging模塊支持將線程名嵌入到各個(gè)日志消息中(使用格式化代碼%(threadName)s)。通過(guò)把線程名包含在日志消息中,就能跟蹤這些消息的來(lái)源。
import logging
import threading
import time
def worker():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def my_service():
logging.debug('Starting')
time.sleep(0.3)
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()
而且logging是線程安全的,所以來(lái)自不同線程的消息在輸出中會(huì)有所區(qū)分。

1.3 守護(hù)與非守護(hù)線程
到目前為止,示例程序都在隱式地等待所有線程完成工作之后才退出。不過(guò),程序有時(shí)會(huì)創(chuàng)建一個(gè)線程作為守護(hù)線程(daemon),這個(gè)線程可以一直運(yùn)行而不阻塞主程序退出。
如果一個(gè)服務(wù)不能很容易地中斷線程,或者即使讓線程工作到一半時(shí)中止也不會(huì)造成數(shù)據(jù)損失或破壞(例如,為一個(gè)服務(wù)監(jiān)控工具生成“心跳”的線程),那么對(duì)于這些服務(wù),使用守護(hù)線程就很有用。要標(biāo)志一個(gè)線程為守護(hù)線程,構(gòu)造線程時(shí)便要傳入daemon=True或者要調(diào)用它的setDaemon()方法并提供參數(shù)True。默認(rèn)情況下線程不作為守護(hù)線程。
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
這個(gè)代碼的輸出中不包含守護(hù)線程的“Exiting“消息,因?yàn)樵趶膕leep()調(diào)用喚醒守護(hù)線程之前,所有非守護(hù)線程(包括主線程)已經(jīng)退出。

要等待一個(gè)守護(hù)線程完成工作,需要使用join()方法。
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join()
t.join()
使用join()等待守護(hù)線程退出意味著它有機(jī)會(huì)生成它的"Exiting"消息。

默認(rèn)地,join()會(huì)無(wú)限阻塞?;蛘?,還可以傳入一個(gè)浮點(diǎn)值,表示等待線程在多長(zhǎng)時(shí)間(秒數(shù))后變?yōu)椴换顒?dòng)。即使線程在這個(gè)時(shí)間段內(nèi)未完成,join()也會(huì)返回。
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()
由于傳人的超時(shí)時(shí)間小于守護(hù)線程睡眠的時(shí)間,所以join()返回之后這個(gè)線程仍是"活著"。

1.4 枚舉所有線程
沒(méi)有必要為所有守護(hù)線程維護(hù)一個(gè)顯示句柄來(lái)確保它們?cè)谕顺鲋鬟M(jìn)程之前已經(jīng)完成。
enumerate()會(huì)返回活動(dòng) Thread實(shí)例的一個(gè)列表。這個(gè)列表也包括當(dāng)前線程,由于等待當(dāng)前線程終止(join)會(huì)引入一種死鎖情況,所以必須跳過(guò)。
import random
import threading
import time
import logging
def worker():
"""thread worker function"""
pause = random.randint(1, 5) / 10
logging.debug('sleeping %0.2f', pause)
time.sleep(pause)
logging.debug('ending')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
t = threading.Thread(target=worker, daemon=True)
t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
logging.debug('joining %s', t.getName())
t.join()
由于工作線程睡眠的時(shí)間量是隨機(jī)的,所以這個(gè)程序的輸出可能有變化。

1.5 派生線程
開(kāi)始時(shí),Thread要完成一些基本初始化,然后調(diào)用其run()方法,這會(huì)調(diào)用傳遞到構(gòu)造函數(shù)的目標(biāo)函數(shù)。要?jiǎng)?chuàng)建Thread的一個(gè)子類(lèi),需要覆蓋run()來(lái)完成所需的工作。
import threading
import logging
class MyThread(threading.Thread):
def run(self):
logging.debug('running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThread()
t.start()
run()的返回值將被忽略。

由于傳遞到Thread構(gòu)造函數(shù)的args和kwargs值保存在私有變量中(這些變量名都有前綴),所以不能很容易地從子類(lèi)訪問(wèn)這些值。要向一個(gè)定制的線程類(lèi)型傳遞參數(shù),需要重新定義構(gòu)造函數(shù),將這些值保存在子類(lèi)可見(jiàn)的一個(gè)實(shí)例屬性中。
import threading
import logging
class MyThreadWithArgs(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug('running with %s and %s',
self.args, self.kwargs)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
t.start()
MyThreadwithArgs使用的API與Thread相同,不過(guò)類(lèi)似于其他定制類(lèi),這個(gè)類(lèi)可以輕松地修改構(gòu)造函數(shù)方法,以取得更多參數(shù)或者與線程用途更直接相關(guān)的不同參數(shù)。

1.6 定時(shí)器線程
有時(shí)出于某種原因需要派生Thread,Timer就是這樣一個(gè)例子,Timer也包含在threading中。Timer在一個(gè)延遲之后開(kāi)始工作,而且可以在這個(gè)延遲期間內(nèi)的任意時(shí)刻被取消。
import threading
import time
import logging
def delayed():
logging.debug('worker running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')
logging.debug('starting timers')
t1.start()
t2.start()
logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')
這個(gè)例子中,第二個(gè)定時(shí)器永遠(yuǎn)不會(huì)運(yùn)行,看起來(lái)第一個(gè)定時(shí)器在主程序的其余部分完成之后還會(huì)運(yùn)行。由于這不是一個(gè)守護(hù)線程,所以在主線程完成時(shí)其會(huì)隱式退出。

1.7 線程間傳送信號(hào)
盡管使用多線程的目的是并發(fā)地運(yùn)行單獨(dú)的操作,但有時(shí)也需要在兩個(gè)或多個(gè)線程中同步操作。事件對(duì)象是實(shí)現(xiàn)線程間安全通信的一種簡(jiǎn)單方法。Event管理一個(gè)內(nèi)部標(biāo)志,調(diào)用者可以用set()和clear()方法控制這個(gè)標(biāo)志。其他線程可以使用wait()暫停,直到這個(gè)標(biāo)志被設(shè)置,可有效地阻塞進(jìn)程直至允許這些線程繼續(xù)。
import logging
import threading
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.is_set():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
t2 = threading.Thread(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')
wait()方法取一個(gè)參數(shù),表示等待事件的時(shí)間(秒數(shù)),達(dá)到這個(gè)時(shí)間后就超時(shí)。它會(huì)返回一個(gè)布爾值,指示事件是否已設(shè)置,使調(diào)用者知道wait()為什么返回??梢詫?duì)事件單獨(dú)地使用is_set()方法而不必?fù)?dān)心阻塞。
在這個(gè)例子中,wait_for_event_timeout()將檢查事件狀態(tài)而不會(huì)無(wú)限阻塞。wait_for_event()在wait()調(diào)用的位置阻塞,事件狀態(tài)改變之前它不會(huì)返回。

1.8 控制資源訪問(wèn)
除了同步線程操作,還有一點(diǎn)很重要,要能夠控制對(duì)共享資源的訪問(wèn),從而避免破壞或丟失數(shù)據(jù)。Python的內(nèi)置數(shù)據(jù)結(jié)構(gòu)(列表、字典等)是線程安全的,這是Python使用原子字節(jié)碼來(lái)管理這些數(shù)據(jù)結(jié)構(gòu)的一個(gè)副作用(更新過(guò)程中不會(huì)釋放保護(hù)Python內(nèi)部數(shù)據(jù)結(jié)構(gòu)的全局解釋器鎖GIL(Global Interpreter Lock))。Python中實(shí)現(xiàn)的其他數(shù)據(jù)結(jié)構(gòu)或更簡(jiǎn)單的類(lèi)型(如整數(shù)和浮點(diǎn)數(shù))則沒(méi)有這個(gè)保護(hù)。要保證同時(shí)安全地訪問(wèn)一個(gè)對(duì)象,可以使用一個(gè)Lock對(duì)象。
import logging
import random
import threading
import time
class Counter:
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug('Waiting for lock')
self.lock.acquire()
try:
logging.debug('Acquired lock')
self.value = self.value + 1
finally:
self.lock.release()
def worker(c):
for i in range(2):
pause = random.random()
logging.debug('Sleeping %0.02f', pause)
time.sleep(pause)
c.increment()
logging.debug('Done')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
counter = Counter()
for i in range(2):
t = threading.Thread(target=worker, args=(counter,))
t.start()
logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is not main_thread:
t.join()
logging.debug('Counter: %d', counter.value)
在這個(gè)例子中,worker()函數(shù)使一個(gè)Counter實(shí)例遞增,這個(gè)實(shí)例管理著一個(gè)Lock,以避免兩個(gè)線程同時(shí)改變其內(nèi)部狀態(tài)。如果沒(méi)有使用Lock,就有可能丟失一次對(duì)value屬性的修改。

要確定是否有另一個(gè)線程請(qǐng)求這個(gè)鎖而不影響當(dāng)前線程,可以向acquire()的blocking參數(shù)傳入False。在下一個(gè)例子中,worker()想要分別得到3次鎖,并統(tǒng)計(jì)為得到鎖而嘗試的次數(shù)。與此同時(shí),lock_holder()在占有和釋放鎖之間循環(huán),每個(gè)狀態(tài)會(huì)短暫暫停,以模擬負(fù)載情況。
import logging
import threading
import time
def lock_holder(lock):
logging.debug('Starting')
while True:
lock.acquire()
try:
logging.debug('Holding')
time.sleep(0.5)
finally:
logging.debug('Not holding')
lock.release()
time.sleep(0.5)
def worker(lock):
logging.debug('Starting')
num_tries = 0
num_acquires = 0
while num_acquires < 3:
time.sleep(0.5)
logging.debug('Trying to acquire')
have_it = lock.acquire(0)
try:
num_tries += 1
if have_it:
logging.debug('Iteration %d: Acquired',
num_tries)
num_acquires += 1
else:
logging.debug('Iteration %d: Not acquired',
num_tries)
finally:
if have_it:
lock.release()
logging.debug('Done after %d iterations', num_tries)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
holder = threading.Thread(
target=lock_holder,
args=(lock,),
name='LockHolder',
daemon=True,
)
holder.start()
worker = threading.Thread(
target=worker,
args=(lock,),
name='Worker',
)
worker.start()
worker()需要超過(guò)3次迭代才能得到3次鎖。

1.8.1 再入鎖
正常的Lock對(duì)象不能請(qǐng)求多次,即使是由同一個(gè)線程請(qǐng)求也不例外。如果同一個(gè)調(diào)用鏈中的多個(gè)函數(shù)訪問(wèn)一個(gè)鎖,則可能會(huì)產(chǎn)生我們不希望的副作用。
import threading
lock = threading.Lock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
在這里,對(duì)第二個(gè)acquire()調(diào)用給定超時(shí)值為0,以避免阻塞,因?yàn)殒i已經(jīng)被第一個(gè)調(diào)用獲得。

如果同一個(gè)線程的不同代碼需要"重新獲得"鎖,那么在這種情況下要使用RLock。
import threading
lock = threading.RLock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
與前面的例子相比,對(duì)代碼唯一的修改就是用RLock替換Lock。

1.8.2 鎖作為上下文管理器
鎖實(shí)現(xiàn)了上下文管理器API,并與with語(yǔ)句兼容。使用with則不再需要顯式地獲得和釋放鎖。
import threading
import logging
def worker_with(lock):
with lock:
logging.debug('Lock acquired via with')
def worker_no_with(lock):
lock.acquire()
try:
logging.debug('Lock acquired directly')
finally:
lock.release()
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))
w.start()
nw.start()
函數(shù)worker_with()和worker_no_with()用等價(jià)的方式管理鎖。

1.9 同步線程
除了使用Event,還可以通過(guò)使用一個(gè)Condition對(duì)象來(lái)同步線程。由于Condition使用了一個(gè)Lock,所以它可以綁定到一個(gè)共享資源,允許多個(gè)線程等待資源更新。在下一個(gè)例子中,consumer()線程要等待設(shè)置了Condition才能繼續(xù)。producer()線程負(fù)責(zé)設(shè)置條件,以及通知其他線程繼續(xù)。
import logging
import threading
import time
def consumer(cond):
"""wait for the condition and use the resource"""
logging.debug('Starting consumer thread')
with cond:
cond.wait()
logging.debug('Resource is available to consumer')
def producer(cond):
"""set up the resource to be used by the consumer"""
logging.debug('Starting producer thread')
with cond:
logging.debug('Making resource available')
cond.notifyAll()
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
args=(condition,))
p = threading.Thread(name='p', target=producer,
args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
這些線程使用with來(lái)獲得與Condition關(guān)聯(lián)的鎖。也可以顯式地使用acquire()和release()方法。

屏障(barrier)是另一種線程同步機(jī)制。Barrier會(huì)建立一個(gè)控制點(diǎn),所有參與線程會(huì)在這里阻塞,直到所有這些參與“方”都到達(dá)這一點(diǎn)。采用這種方法,線程可以單獨(dú)啟動(dòng)然后暫停,直到所有線程都準(zhǔn)備好才可以繼續(xù)。
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
worker_id = barrier.wait()
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
for t in threads:
t.join()
在這個(gè)例子中,Barrier被配置為會(huì)阻塞線程,直到3個(gè)線程都在等待。滿足這個(gè)條件時(shí),所有線程被同時(shí)釋放從而越過(guò)這個(gè)控制點(diǎn)。wait()的返回值指示了釋放的參與線程數(shù),可以用來(lái)限制一些線程做清理資源等動(dòng)作。

Barrier的abort()方法會(huì)使所有等待線程接收一個(gè)BrokenBarrierError。如果線程在wait()上被阻塞而停止處理,這就允許線程完成清理工作。
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
print(threading.current_thread().name, 'aborting')
else:
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS + 1)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
barrier.abort()
for t in threads:
t.join()
這個(gè)例子將Barrier配置為多加一個(gè)線程,即需要比實(shí)際啟動(dòng)的線程再多一個(gè)參與線程,所以所有線程中的處理都會(huì)阻塞。在被阻塞的各個(gè)線程中,abort()調(diào)用會(huì)產(chǎn)生一個(gè)異常。

1.10 限制資源的并發(fā)訪問(wèn)
有時(shí)可能需要允許多個(gè)工作線程同時(shí)訪問(wèn)一個(gè)資源,但要限制總數(shù)。例如,連接池支持同時(shí)連接,但數(shù)目可能是固定的,或者一個(gè)網(wǎng)絡(luò)應(yīng)用可能支持固定數(shù)目的并發(fā)下載。這些連接就可以使用Semaphore來(lái)管理。
import logging
import threading
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug('Running: %s', self.active)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug('Running: %s', self.active)
def worker(s, pool):
logging.debug('Waiting to join the pool')
with s:
name = threading.current_thread().getName()
pool.makeActive(name)
time.sleep(0.1)
pool.makeInactive(name)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
t = threading.Thread(
target=worker,
name=str(i),
args=(s, pool),
)
t.start()
在這個(gè)例子中,ActivePool類(lèi)只作為一種便利方法,用來(lái)跟蹤某個(gè)給定時(shí)刻哪些線程能夠運(yùn)行。真正的資源池會(huì)為新的活動(dòng)線程分配一個(gè)連接或另外某個(gè)值,并且當(dāng)這個(gè)線程工作完成時(shí)再回收這個(gè)值。在這里,資源池只是用來(lái)保存活動(dòng)線程的名,以顯示至少有兩個(gè)線程在并發(fā)運(yùn)行。

1.11 線程特定的數(shù)據(jù)
有些資源需要鎖定以便多個(gè)線程使用,另外一些資源則需要保護(hù),以使它們對(duì)并非是這些資源的“所有者”的線程隱藏。local()函數(shù)會(huì)創(chuàng)建一個(gè)對(duì)象,它能夠隱藏值,使其在不同線程中無(wú)法被看到。
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
屬性local_data.value對(duì)所有線程都不可見(jiàn),除非在某個(gè)線程中設(shè)置了這個(gè)屬性,這個(gè)線程才能看到它。

要初始化設(shè)置以使所有線程在開(kāi)始時(shí)都有相同的值,可以使用一個(gè)子類(lèi),并在_init_()中設(shè)置這些屬性。
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug('Initializing %r', self)
self.value = value
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
這會(huì)在相同的對(duì)象上調(diào)用_init_()(注意id()值),每個(gè)線程中調(diào)用一次以設(shè)置默認(rèn)值。

總結(jié)
到此這篇關(guān)于Python3標(biāo)準(zhǔn)庫(kù):threading進(jìn)程中管理并發(fā)操作的文章就介紹到這了,更多相關(guān)Python3標(biāo)準(zhǔn)庫(kù):threading進(jìn)程中管理并發(fā)操作內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Python標(biāo)準(zhǔn)庫(kù)與第三方庫(kù)詳解
- Python新手學(xué)習(xí)標(biāo)準(zhǔn)庫(kù)模塊命名
- Python標(biāo)準(zhǔn)庫(kù):內(nèi)置函數(shù)max(iterable, *[, key, default])說(shuō)明
- Python3標(biāo)準(zhǔn)庫(kù)之dbm UNIX鍵-值數(shù)據(jù)庫(kù)問(wèn)題
- 淺析python標(biāo)準(zhǔn)庫(kù)中的glob
- python 第三方庫(kù)paramiko的常用方式
- Python第三方庫(kù)安裝緩慢的解決方法
- 使用豆瓣源來(lái)安裝python中的第三方庫(kù)方法
- Python基礎(chǔ)之標(biāo)準(zhǔn)庫(kù)和常用的第三方庫(kù)案例教程
相關(guān)文章
python實(shí)現(xiàn)調(diào)用攝像頭并拍照發(fā)郵箱
這篇文章主要介紹了python實(shí)現(xiàn)調(diào)用攝像頭并拍照發(fā)郵箱的程序,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-04-04
探秘TensorFlow 和 NumPy 的 Broadcasting 機(jī)制
這篇文章主要介紹了探秘TensorFlow 和 NumPy 的 Broadcasting 機(jī)制,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03
Python人工智能之路 jieba gensim 最好別分家之最簡(jiǎn)單的相似度實(shí)現(xiàn)
這篇文章主要介紹了Python人工智能之路 jieba gensim 最好別分家之最簡(jiǎn)單的相似度實(shí)現(xiàn) ,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-08-08
OpenCV 輪廓檢測(cè)的實(shí)現(xiàn)方法
這篇文章主要介紹了OpenCV 輪廓檢測(cè)的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07
python用來(lái)獲得圖片exif信息的庫(kù)實(shí)例分析
這篇文章主要介紹了python用來(lái)獲得圖片exif信息的庫(kù),實(shí)例分析了exif-py庫(kù)文件的使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03
Python實(shí)現(xiàn)自動(dòng)化發(fā)送郵件
大家好,本篇文章主要講的是Python實(shí)現(xiàn)自動(dòng)化發(fā)送郵件,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2022-01-01
使用PyInstaller將python轉(zhuǎn)成可執(zhí)行文件exe筆記
這篇文章主要介紹了使用PyInstaller將python轉(zhuǎn)成可執(zhí)行文件exe筆記,需要的朋友可以參考下2018-05-05

