詳解Python中的多線程編程
一、簡介
多線程編程技術(shù)可以實現(xiàn)代碼并行性,優(yōu)化處理能力,同時功能的更小劃分可以使代碼的可重用性更好。Python中threading和Queue模塊可以用來實現(xiàn)多線程編程。
二、詳解
1、線程和進程
進程(有時被稱為重量級進程)是程序的一次執(zhí)行。每個進程都有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其它記錄其運行軌跡的輔助數(shù)據(jù)。操作系統(tǒng)管理在其上運行的所有進程,并為這些進程公平地分配時間。進程也可以通過fork和spawn操作來完成其它的任務(wù),不過各個進程有自己的內(nèi)存空間、數(shù)據(jù)棧等,所以只能使用進程間通訊(IPC),而不能直接共享信息。
線程(有時被稱為輕量級進程)跟進程有些相似,不同的是所有的線程運行在同一個進程中,共享相同的運行環(huán)境。它們可以想像成是在主進程或“主線程”中并行運行的“迷你進程”。線程有開始、順序執(zhí)行和結(jié)束三部分,它有一個自己的指令指針,記錄自己運行到什么地方。線程的運行可能被搶占(中斷)或暫時的被掛起(也叫睡眠)讓其它的線程運行,這叫做讓步。一個進程中的各個線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進程之間更方便地共享數(shù)據(jù)以及相互通訊。線程一般都是并發(fā)執(zhí)行的,正是由于這種并行和數(shù)據(jù)共享的機制使得多個任務(wù)的合作變?yōu)榭赡?。實際上,在單CPU的系統(tǒng)中,真正的并發(fā)是不可能的,每個線程會被安排成每次只運行一小會,然后就把CPU讓出來,讓其它的線程去運行。在進程的整個運行過程中,每個線程都只做自己的事,在需要的時候跟其它的線程共享運行的結(jié)果。多個線程共同訪問同一片數(shù)據(jù)不是完全沒有危險的,由于數(shù)據(jù)訪問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題,這叫做競態(tài)條件。而大多數(shù)線程庫都帶有一系列的同步原語,來控制線程的執(zhí)行和數(shù)據(jù)的訪問。
2、使用線程
(1)全局解釋器鎖(GIL)
Python代碼的執(zhí)行由Python虛擬機(也叫解釋器主循環(huán))來控制。Python在設(shè)計之初就考慮到要在主循環(huán)中,同時只有一個線程在執(zhí)行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。在多線程環(huán)境中,Python 虛擬機按以下方式執(zhí)行:a、設(shè)置 GIL;b、切換到一個線程去運行;c、運行指定數(shù)量的字節(jié)碼指令或者線程主動讓出控制(可以調(diào)用 time.sleep(0));d、把線程設(shè)置為睡眠狀態(tài);e、解鎖 GIL;d、再次重復(fù)以上所有步驟。
在調(diào)用外部代碼(如 C/C++擴展函數(shù))的時候,GIL將會被鎖定,直到這個函數(shù)結(jié)束為止(由于在這期間沒有Python的字節(jié)碼被運行,所以不會做線程切換)編寫擴展的程序員可以主動解鎖GIL。
(2)退出線程
當一個線程結(jié)束計算,它就退出了。線程可以調(diào)用thread.exit()之類的退出函數(shù),也可以使用Python退出進程的標準方法,如sys.exit()或拋出一個SystemExit異常等。不過,不可以直接“殺掉”("kill")一個線程。
不建議使用thread模塊,很明顯的一個原因是,當主線程退出的時候,所有其它線程沒有被清除就退出了。另一個模塊threading就能確保所有“重要的”子線程都退出后,進程才會結(jié)束。
(3)Python的線程模塊
Python提供了幾個用于多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創(chuàng)建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創(chuàng)建一個可以用于多個線程之間共享數(shù)據(jù)的隊列數(shù)據(jù)結(jié)構(gòu)。
避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現(xiàn)沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結(jié)束時,所有的線程都會被強制結(jié)束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出后進程才退出。
3、thread模塊
thread模塊除了產(chǎn)生線程外,thread模塊也提供了基本的同步數(shù)據(jù)結(jié)構(gòu)鎖對象(lock object也叫原語鎖、簡單鎖、互斥鎖、互斥量、二值信號量)。同步原語與線程的管理是密不可分的。
常用的線程函數(shù)以及LockType類型的鎖對象的方法:

#!/usr/bin/env python
import thread
from time import sleep, ctime
def loop0():
print '+++start loop 0 at:', ctime()
sleep(4)
print '+++loop 0 done at:', ctime()
def loop1():
print '***start loop 1 at:', ctime()
sleep(2)
print '***loop 1 done at:', ctime()
def main():
print '------starting at:', ctime()
thread.start_new_thread(loop0, ())
thread.start_new_thread(loop1, ())
sleep(6)
print '------all DONE at:', ctime()
if __name__ == '__main__':
main()
thread 模塊提供的簡單的多線程的機制,兩個循環(huán)并發(fā)地被執(zhí)行,總的運行時間為最慢的那個線程的運行時間(主線程6s),而不是所有的線程的運行時間之和。start_new_thread()要求要有前兩個參數(shù),就算想要運行的函數(shù)不要參數(shù),也要傳一個空的元組。

sleep(6)是讓主線程停下來,主線程一旦運行結(jié)束,就關(guān)閉運行著其他兩個線程。但這可能造成主線程過早或過晚退出,那就要使用線程鎖,可以在兩個子線程都退出后,主線程立即退出。
在CODE上查看代碼片派生到我的代碼片
#!/usr/bin/env python
import thread
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec, lock):
print '+++start loop:', nloop, 'at:', ctime()
sleep(nsec)
print '+++loop:', nloop, 'done at:', ctime()
lock.release()
def main():
print '---starting threads...'
locks = []
nloops = range(len(loops))
for i in nloops:
lock = thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in nloops:
thread.start_new_thread(loop,
(i, loops[i], locks[i]))
for i in nloops:
while locks[i].locked(): pass
print '---all DONE at:', ctime()
if __name__ == '__main__':
main()

4、threading模塊
更高級別的threading模塊,它不僅提供了Thread類,還提供了各種非常好用的同步機制。threading 模塊里所有的對象:

thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務(wù)器,如果沒有客戶提出請求它就在那等著,如果設(shè)定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。如果主線程退出不用等待那些子線程完成,那就設(shè)定這些線程的daemon屬性,即在線程thread.start()開始前,調(diào)用setDaemon()函數(shù)設(shè)定線程的daemon標志(thread.setDaemon(True))就表示這個線程“不重要”。如果想要等待子線程完成再退出,那就什么都不用做或者顯式地調(diào)用thread.setDaemon(False)以保證其daemon標志為False,可以調(diào)用thread.isDaemon()函數(shù)來判斷其daemon標志的值。新的子線程會繼承其父線程的daemon標志,整個Python會在所有的非守護線程退出后才會結(jié)束,即進程中沒有非守護線程存在的時候才結(jié)束。
(1)threading的Thread類
它有很多thread模塊里沒有的函數(shù),Thread對象的函數(shù):

創(chuàng)建一個Thread的實例,傳給它一個函數(shù)
在CODE上查看代碼片派生到我的代碼片
#!/usr/bin/env python
import threading
from time import sleep, ctime
loops = [ 4, 2 ]
def loop(nloop, nsec):
print '+++start loop:', nloop, 'at:', ctime()
sleep(nsec)
print '+++loop:', nloop, 'done at:', ctime()
def main():
print '---starting at:', ctime()
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop,
args=(i, loops[i]))
threads.append(t)
for i in nloops: # start threads
threads[i].start()
for i in nloops: # wait for all
threads[i].join() # threads to finish
print '---all DONE at:', ctime()
if __name__ == '__main__':
main()
實例化一個Thread(調(diào)用 Thread())與調(diào)用thread.start_new_thread()之間最大的區(qū)別就是,新的線程不會立即開始。在創(chuàng)建線程對象,但不想馬上開始運行線程的時候,這是一個很有用的同步特性。所有的線程都創(chuàng)建了之后,再一起調(diào)用 start()函數(shù)啟動,而不是創(chuàng)建一個啟動一個。而且也不用再管理一堆鎖(分配鎖、獲得鎖、釋放鎖、檢查鎖的狀態(tài)等),只要簡單地對每個線程調(diào)用join()主線程等待子線程的結(jié)束即可。join()還可以設(shè)置timeout的參數(shù),即主線程等到超時為止。
join()的另一個比較重要的方面是它可以完全不用調(diào)用,一旦線程啟動后,就會一直運行,直到線程的函數(shù)結(jié)束,退出為止。如果主線程除了等線程結(jié)束外,還有其它的事情要做,那就不用調(diào)用 join(),只有在等待線程結(jié)束的時候才調(diào)用join()。
創(chuàng)建一個Thread的實例,傳給它一個可調(diào)用的類對象
[html] view plaincopy在CODE上查看代碼片派生到我的代碼片
#!/usr/bin/env python
import threading
from time import sleep, ctime
loops = [ 4, 2 ]
class ThreadFunc(object):
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
apply(self.func, self.args)
def loop(nloop, nsec):
print 'start loop', nloop, 'at:', ctime()
sleep(nsec)
print 'loop', nloop, 'done at:', ctime()
def main():
print 'starting at:', ctime()
threads = []
nloops = range(len(loops))
for i in nloops: # create all threads
t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops: # start all threads
threads[i].start()
for i in nloops: # wait for completion
threads[i].join()
print 'all DONE at:', ctime()
if __name__ == '__main__':
main()
與傳一個函數(shù)很相似的另一個方法是在創(chuàng)建線程的時候,傳一個可調(diào)用的類的實例供線程啟動的時候執(zhí)行,這是多線程編程的一個更為面向?qū)ο蟮姆椒?。相對于一個或幾個函數(shù)來說,類對象里可以使用類的強大的功能。創(chuàng)建新線程的時候,Thread對象會調(diào)用ThreadFunc對象,這時會用到一個特殊函數(shù)__call__()。由于已經(jīng)有了要用的參數(shù),所以就不用再傳到Thread()的構(gòu)造函數(shù)中。由于有一個參數(shù)的元組,這時要使用apply()函數(shù)或使用self.res = self.func(*self.args)。
從Thread派生出一個子類,創(chuàng)建一個這個子類的實例
在CODE上查看代碼片派生到我的代碼片
#!/usr/bin/env python
import threading
from time import sleep, ctime
loops = [ 4, 2 ]
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
print 'starting', self.name, 'at:', ctime()
self.res = apply(self.func, self.args)
print self.name, 'finished at:', ctime()
def loop(nloop, nsec):
print 'start loop', nloop, 'at:', ctime()
sleep(nsec)
print 'loop', nloop, 'done at:', ctime()
def main():
print 'starting at:', ctime()
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]),
loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print 'all DONE at:', ctime()
if __name__ == '__main__':
main()
子類化Thread類,MyThread子類的構(gòu)造函數(shù)一定要先調(diào)用基類的構(gòu)造函數(shù),特殊函數(shù)__call__()在子類中,名字要改為run()。在 MyThread類中,加入一些用于調(diào)試的輸出信息,把代碼保存到myThread模塊中,并導(dǎo)入這個類。除使用apply()函數(shù)來運行這些函數(shù)之外,還可以把結(jié)果保存到實現(xiàn)的self.res屬性中,并創(chuàng)建一個新的函數(shù)getResult()來得到結(jié)果。
(2)threading模塊中的其它函數(shù)

5、Queue模塊
常用的 Queue 模塊的屬性:

Queue模塊可以用來進行線程間通訊,讓各個線程之間共享數(shù)據(jù)。Queue解決生產(chǎn)者-消費者的問題,現(xiàn)在創(chuàng)建一個隊列,讓生產(chǎn)者線程把新生產(chǎn)的貨物放進去供消費者線程使用。生產(chǎn)者生產(chǎn)貨物所要花費的時間無法預(yù)先確定,消費者消耗生產(chǎn)者生產(chǎn)的貨物的時間也是不確定的。
在CODE上查看代碼片派生到我的代碼片
#!/usr/bin/env python
from random import randint
from time import sleep
from Queue import Queue
from myThread import MyThread
def writeQ(queue):
print '+++producing object for Q...',
queue.put('xxx', 1)
print "+++size now:", queue.qsize()
def readQ(queue):
val = queue.get(1)
print '---consumed object from Q... size now', \
queue.qsize()
def writer(queue, loops):
for i in range(loops):
writeQ(queue)
sleep(randint(1, 3))
def reader(queue, loops):
for i in range(loops):
readQ(queue)
sleep(randint(2, 5))
funcs = [writer, reader]
nfuncs = range(len(funcs))
def main():
nloops = randint(2, 5)
q = Queue(32)
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (q, nloops), \
funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print '***all DONE'
if __name__ == '__main__':
main()

這個實現(xiàn)中使用了Queue對象和隨機地生產(chǎn)(和消耗)貨物的方式。生產(chǎn)者和消費者相互獨立并且并發(fā)地運行,它們不一定是輪流執(zhí)行的(隨機數(shù)模擬)。writeQ()和readQ()函數(shù)分別用來把對象放入隊列和消耗隊列中的一個對象,在這里使用字符串'xxx'來表示隊列中的對象。writer()函數(shù)就是一次往隊列中放入一個對象,等待一會然后再做同樣的事,一共做指定的次數(shù),這個次數(shù)是由腳本運行時隨機生成的。reader()函數(shù)做的事比較類似,只是它是用來消耗對象的。
6、線程相關(guān)模塊
多線程相關(guān)的標準庫模塊:

三、總結(jié)
(1)一個要完成多項任務(wù)的程序,可以考慮每個任務(wù)使用一個線程,這樣的程序在設(shè)計上相對于單線程做所有事的程序來說,更為清晰明了。
(2)單線程的程序在程序性能上的限制,尤其在有相互獨立、運行時間不確定、多個任務(wù)的程序里,而把多個任務(wù)分隔成多個線程同時運行會比順序運行速度更快。由于Python解釋器是單線程的,所以不是所有的程序都能從多線程中得到好處。
(3)若有不足,請留言,在此先感謝!
相關(guān)文章
Python Spyder 調(diào)出縮進對齊線的操作
這篇文章主要介紹了Python Spyder 調(diào)出縮進對齊線的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
手把手教你使用Django + Vue.js 快速構(gòu)建項目
本篇將基于Django + Vue.js,手把手教大家快速的實現(xiàn)一個前后端分離的Web項目。文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08
解決TypeError: Object of type xxx is&
這篇文章主要介紹了解決TypeError: Object of type xxx is not JSON serializable錯誤問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06

