詳解Python中的多線程
什么是多線程:
進(jìn)程:正在運行的程序,QQ 360 ......
線程:就是進(jìn)程中一條執(zhí)行程序的執(zhí)行路徑,一個程序至少有一條執(zhí)行路徑。(360中的殺毒 電腦體檢 電腦清理 同時運行的話就需要開啟多條路徑)
每個線程都有自己需要運行的內(nèi)容,而這些內(nèi)容可以稱為線程要執(zhí)行的任務(wù)。
開啟多線程是為了同時運行多部分代碼。
好處:解決了多部分需要同時運行的問題
弊端:如果線程過多,會導(dǎo)致效率很低(因為程序的執(zhí)行都是CPU做著隨機 快速切換來完成的)
- 線程與進(jìn)程的區(qū)別
- 線程共享內(nèi)存,進(jìn)程獨立內(nèi)存
- 線程啟動速度塊,進(jìn)程啟動速度慢,運行時速度沒有可比性
- 同一個進(jìn)程的線程間可以直接交流,兩個進(jìn)程想通信,必須通過一個中間代理來實現(xiàn)
- 創(chuàng)建新線程很簡單,創(chuàng)建新進(jìn)程需要對其父進(jìn)程進(jìn)行一次克隆
- 一個線程可以控制和操作同一線程里的其他線程,但是進(jìn)程只能操作子進(jìn)程
threading模塊
多線程的使用方式一:直接使用
# -*- coding:utf-8 -*-
# 線程使用的方式一
import threading
import time
# 需要多線程運行的函數(shù)
def fun(args):
print("我是線程%s" % args)
time.sleep(2)
print("線程%s運行結(jié)束" % args)
# 創(chuàng)建線程
t1 = threading.Thread(target=fun, args=(1,))
t2 = threading.Thread(target=fun, args=(2,))
start_time = time.time()
t1.start()
t2.start()
end_time = time.time()
print("兩個線程一共的運行時間為:", end_time-start_time)
print("主線程結(jié)束")
"""
運行結(jié)果:
我是線程1
我是線程2兩個線程一共的運行時間為: 0.0010077953338623047
主線程結(jié)束
線程1運行結(jié)束
線程2運行結(jié)束
"""線程的第二種使用方式:繼承式調(diào)用
# 繼承式調(diào)用
import threading
import time
class MyThreading(threading.Thread):
def __init__(self, name):
super(MyThreading, self).__init__()
self.name = name
# 線程要運行的代碼
def run(self):
print("我是線程%s" % self.name)
time.sleep(2)
print("線程%s運行結(jié)束" % self.name)
t1 = MyThreading(1)
t2 = MyThreading(2)
start_time = time.time()
t1.start()
t2.start()
end_time = time.time()
print("兩個線程一共的運行時間為:", end_time-start_time)
print("主線程結(jié)束")
"""
運行結(jié)果:
我是線程1
我是線程2
兩個線程一共的運行時間為: 0.0010724067687988281
主線程結(jié)束
線程2運行結(jié)束
線程1運行結(jié)束
"""守護線程與join方法
- 在Python多線程編程中,join方法的作用式線程同步。
- 守護線程,是為守護別人而存在的,當(dāng)設(shè)置為守護線程后,被守護的主線程不存在后,守護線程也自然不存在。
- 第一種:python多線程默認(rèn)情況
- Python多線程默認(rèn)情況(設(shè)置線程setDaemon(False)),主線程執(zhí)行完自己的任務(wù)后,就退出了,此時子線程會繼續(xù)執(zhí)行自己的任務(wù),直到子線程任務(wù)結(jié)束
- 代碼演示:threading中的兩個創(chuàng)建多線成的例子都是。
- 第二種:開啟守護線程
- 開啟線程的setDaemon(True)),設(shè)置子線程為守護線程,實現(xiàn)主程序結(jié)束,子程序立馬全部結(jié)束功能
- 代碼演示:
# 守護線程
import threading
import time
class MyThreading(threading.Thread):
def __init__(self, name):
super(MyThreading, self).__init__()
self.name = name
# 線程要運行的代碼
def run(self):
print("我是線程%s" % self.name)
time.sleep(2)
print("線程%s運行結(jié)束" % self.name)
t1 = MyThreading(1)
t2 = MyThreading(2)
start_time = time.time()
t1.setDaemon(True)
t1.start()
t2.setDaemon(True)
t2.start()
end_time = time.time()
print("兩個線程一共的運行時間為:", end_time-start_time)
print("主線程結(jié)束")- 注意:如果要設(shè)置為守護線程,一定要在開啟線程之前,將該線程設(shè)置為守護線程
- 結(jié)論:主線程結(jié)束后,無論子線程1,2是否運行完成,都結(jié)束線程,不在繼續(xù)向下運行
- 第三種:加入join方法設(shè)置同步
- 當(dāng)不給程序設(shè)置守護進(jìn)程時,主程序?qū)⒁恢钡却映绦蛉窟\行完成才結(jié)束
- 代碼演示:
# join:線程同步
import threading
import time
class MyThreading(threading.Thread):
def __init__(self, name):
super(MyThreading, self).__init__()
self.name = name
# 線程要運行的代碼
def run(self):
print("我是線程%s" % self.name)
time.sleep(3)
print("線程%s運行結(jié)束" % self.name)
threading_list = []
start_time = time.time()
for x in range(50):
t = MyThreading(x)
t.start()
threading_list.append(t)
for x in threading_list:
x.join() # 為線程開啟同步
end_time = time.time()
print("50個線程一共的運行時間為:", end_time-start_time)
print("主線程結(jié)束")結(jié)論:主線程等待50個子線程全部執(zhí)行完成才結(jié)束。
線程鎖(互斥鎖Mutex)
- 一個進(jìn)程下可以啟用多個線程,多個線程共享父進(jìn)程的內(nèi)存空間,也就意味著每個線程可以訪問同一份數(shù)據(jù),此時如果多個線程同時要修改一份數(shù)據(jù),會出現(xiàn)什么狀況?
- 代碼演示:
# -*- coding:utf8 -*-
import threading
import time
num = 100
threading_list = []
def fun():
global num
print("get num:", num)
num += 1
time.sleep(1)
for x in range(200):
t = threading.Thread(target=fun)
t.start()
threading_list.append(t)
for x in threading_list:
x.join()
print("nun:", num)- 結(jié)論:運行結(jié)果可能會出現(xiàn)num<300的情況
- 正常來講,這個num結(jié)果應(yīng)該是300, 但在python 2.7上多運行幾次,會發(fā)現(xiàn),最后打印出來的num結(jié)果不總是300,為什么每次運行的結(jié)果不一樣呢? 哈,很簡單,假設(shè)你有A,B兩個線程,此時都 要對num 進(jìn)行加1操作, 由于2個線程是并發(fā)同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當(dāng)A線程去處完的結(jié)果是101,但此時B線程運算完的結(jié)果也是101,兩個線程同時CPU運算的結(jié)果再賦值給num變量后,結(jié)果就都是101。那怎么辦呢? 很簡單,每個線程在要修改公共數(shù)據(jù)時,為了避免自己在還沒改完的時候別人也來修改此數(shù)據(jù),可以給這個數(shù)據(jù)加一把鎖, 這樣其它線程想修改此數(shù)據(jù)時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數(shù)據(jù)。
*注:不要在3.x上運行,不知為什么,3.x上的結(jié)果總是正確的,可能是自動加了鎖
加鎖版本:
import random
import threading
import time
num = 100
threading_list = []
def fun():
global num
time.sleep(random.random())
lock.acquire() # 加鎖
print("get num:", num, threading.current_thread())
num += 1
lock.release() # 釋放鎖
# 實例化鎖對象
lock = threading.Lock()
for x in range(200):
t = threading.Thread(target=fun)
t.start()
threading_list.append(t)
for x in threading_list:
x.join()
print("num:", num)GIL VS Lock
機智的同學(xué)可能會問到這個問題,就是既然你之前說過了,Python已經(jīng)有一個GIL來保證同一時間只能有一個線程來執(zhí)行了,為什么這里還需要lock? 注意啦,這里的lock是用戶級的lock,跟那個GIL沒關(guān)系 ,具體我們通過下圖來看一下+配合我現(xiàn)場講給大家,就明白了。

那你又問了, 既然用戶程序已經(jīng)自己有鎖了,那為什么C python還需要GIL呢?加入GIL主要的原因是為了降低程序的開發(fā)的復(fù)雜度,比如現(xiàn)在的你寫python不需要關(guān)心內(nèi)存回收的問題,因為Python解釋器幫你自動定期進(jìn)行內(nèi)存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內(nèi)存數(shù)據(jù)是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是并發(fā)運行的,假設(shè)你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內(nèi)存空間賦值了,結(jié)果就有可能新賦值的數(shù)據(jù)被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當(dāng)一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖
import threading, time
def run1():
lock.acquire()
print("grab the first part data")
global num
num += 1
lock.release()
return num
def run2():
lock.acquire()
print("grab the second part data")
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(3):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)在開發(fā)的過程中要注意有些操作默認(rèn)都是 線程安全的(內(nèi)部集成了鎖的機制),我們在使用的時無需再通過鎖再處理,例如:
import threading
data_list = []
lock_object = threading.RLock()
def task():
print("開始")
for i in range(1000000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=task)
t.start()
Semaphore(信號量)
- 互斥鎖同時只允許一個線程修改數(shù)據(jù),而Semaphore是同時允許一定數(shù)量的線程修改數(shù)據(jù),比如廁所有三個坑,那最多只允許三個人上廁所,后面的人只能等前面的人出來才能進(jìn)去。
- 代碼演示:
# -*- coding:GBK -*-
import threading
import time
sum_1 = 0
def run(i):
global sum_1
time.sleep(1)
# lock.acquire()
semaphore.acquire()
sum_1 += 1
print("線程%s來了,并修改了sum_1的值為:%s" % (i, sum_1))
semaphore.release()
# lock.release()
# lock = threading.Lock()
semaphore = threading.BoundedSemaphore(5)
for x in range(10):
t = threading.Thread(target=run, args=(x,))
t.start()
while threading.active_count() != 1:
pass
print("程序結(jié)束")Event(事件)
- 通過Event來實現(xiàn)兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規(guī)則。
- 四個常用方法
set() # 設(shè)置標(biāo)志位為 True
clear() # 清空標(biāo)志位(將標(biāo)志位改為false)
is_set() # 檢測標(biāo)志位,如果標(biāo)志位被設(shè)置,返回True,否則返回False
wait() # 等待標(biāo)志位被設(shè)置位True程序才繼續(xù)往下運行
代碼演示:
# -*- coding:utf-8 -*-
import threading
import time
def light():
count = 1
event.set() # 設(shè)置標(biāo)志位 True
while True:
if count <= 10:
print("現(xiàn)在是綠燈")
time.sleep(1)
elif count <= 15:
print("現(xiàn)在是紅燈")
event.clear() # 清空標(biāo)志位(將標(biāo)志位改為false)
time.sleep(1)
else:
count = 0
event.set()
count += 1
def car(name):
while True:
if event.is_set():
print("----------%s在起飛-------------" % name)
time.sleep(1)
else:
print("---------%s在等紅燈---------------" % name)
event.wait() # 等待標(biāo)志位被設(shè)置位True程序才繼續(xù)往下運行
event = threading.Event()
light_1 = threading.Thread(target=light)
light_1.start()
for x in range(5):
car_1 = threading.Thread(target=car, args=("馬自達(dá)"+str(x),))
car_1.start()紅綠燈案例
Queue(隊列)
queue.Queue(maxsize=0) #隊列:先進(jìn)先出 maxsize:設(shè)置隊列的大小 queue.LifoQueue(maxsize=0) ##last in fisrt out maxsize:設(shè)置隊列的大小 queue.PriorityQueue(maxsize=0) #存儲數(shù)據(jù)時可設(shè)置優(yōu)先級的隊列,按優(yōu)先級順序(最低的先) maxsize:設(shè)置隊列的大小
exceptionqueue.Empty
Exception raised when non-blocking get()(or get_nowait()) is called on a Queue object which is empty.
當(dāng)在一個空的隊列對象上調(diào)用非阻塞的get()(或get_nowait())時,會產(chǎn)生異常。
exceptionqueue.Full
Exception raised when non-blocking put() (or put_nowait() ) is called on a Queue object which is full.
當(dāng)非阻塞的put()(或put_nowait())被調(diào)用到一個已滿的隊列對象上時引發(fā)的異常。
import queue
# 實例化隊列對象
q = queue.Queue(3)
print(q.qsize()) # 獲取隊列內(nèi)數(shù)據(jù)的長度
print(q.empty()) # 如果隊列是空的,返回True,否則返回False(不可靠!)
print(q.full()) # 如果隊列已滿,返回True,否則返回False(不可靠?。?
"""
Queue.put(item, block=True, timeout=None)
可以簡寫:Queue.put(item, True, 5)
將項目放入隊列。
如果可選的args block為true(默認(rèn)值),并且timeout為None(默認(rèn)值),必要時進(jìn)行阻塞,直到有空閑的槽。
如果timeout是一個正數(shù),它最多阻斷timeout秒,如果在這段時間內(nèi)沒有空閑槽,則引發(fā)Full異常。
否則(block為false),如果有空閑的槽立即可用,就在隊列上放一個項目,否則就引發(fā)Full異常(在這種情況下忽略超時)。
"""
q.put(1) # 數(shù)據(jù)“1”進(jìn)入隊列
q.put("nihao") # 數(shù)據(jù)"nihao"進(jìn)入隊列
q.put("456ni", block=True, timeout=5)
'''
將一個項目放入隊列中,不進(jìn)行阻斷。
只有在有空閑位置的情況下才排隊。
否則會引發(fā)Full異常。
'''
# q.put_nowait(123)
'''
Queue.get(block=True, timeout=None)
可以簡寫:Queue.get(True, 3)
從隊列中刪除并返回一個項目。
如果可選的args'block'為True(默認(rèn)),'timeout'為無(默認(rèn))。
就會在必要時阻塞,直到有一個項目可用。
如果'timeout'是非負(fù)數(shù),它最多阻斷'timeout'秒,如果在這段時間內(nèi)沒有項目可用,則引發(fā)Empty異常。
否則('block'為False),如果有一個項目立即可用,則返回一個項目。
否則引發(fā)Empty異常('timeout'被忽略了在這種情況下)。
'''
print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True, timeout=2))
'''
從隊列中移除并返回一個項目,而不阻塞。
只有當(dāng)一個項目立即可用時,才會得到一個項目。
否則引發(fā)Empty異常。
'''
# print(q.get_nowait())生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
# 生產(chǎn)者/消費者
import threading
import queue
import time
# 生產(chǎn)者
def producer(name):
count = 1
while True:
p.put("{}骨頭{}".format(name, count))
print("骨頭{}被{}生產(chǎn)".format(count, name).center(60, "*"))
count += 1
time.sleep(0.1)
# 消費者
def consumer(name):
while True:
print("{}被{}吃掉了".format(p.get(), name))
# 實例化隊列對象
p = queue.Queue(10)
# 創(chuàng)建生產(chǎn)者線程
producer_threading1 = threading.Thread(target=producer, args=("飛某人",))
producer_threading2 = threading.Thread(target=producer, args=("Alex",))
# 創(chuàng)建消費者線程
consumer_threading1 = threading.Thread(target=consumer, args=("張三",))
consumer_threading2 = threading.Thread(target=consumer, args=("李四",))
producer_threading1.start()
producer_threading2.start()
consumer_threading1.start()
consumer_threading2.start()線程池
Python3中官方才正式提供線程池。
線程不是開的越多越好,開的多了可能會導(dǎo)致系統(tǒng)的性能更低了,例如:如下的代碼是不推薦在項目開發(fā)中編寫。
import threading
def task(video_url):
pass
url_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]
for url in url_list:
t = threading.Thread(target=task, args=(url,))
t.start()
# 這種每次都創(chuàng)建一個線程去操作,創(chuàng)建任務(wù)的太多,線程就會特別多,可能效率反倒降低了。建議:使用線程池
import time
from concurrent.futures import ThreadPoolExecutor # 并行期貨,線程池執(zhí)行者
"""
pool = ThreadPoolExecutor(100)
pool.submit(函數(shù)名,參數(shù)1,參數(shù)2,參數(shù)...)
"""
def task(video_url, num):
print("開始執(zhí)行任務(wù)", video_url, num) # 開始執(zhí)行任務(wù) www.xxxx-299.com 3
time.sleep(1)
# 創(chuàng)建線程池,最多維護10個線程
threadpool = ThreadPoolExecutor(10)
# 生成300網(wǎng)址,并放入列表
url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]
for url in url_list:
"""
在線程池中提交一個任務(wù),線程池如果有空閑線程,則分配一個線程去執(zhí)行,執(zhí)行完畢后在將線程交還給線程池,
如果沒有空閑線程,則等待。注意在等待時,與主線程無關(guān),主線程依然在繼續(xù)執(zhí)行。
"""
threadpool.submit(task, url, 3)
print("等待線程池中的任務(wù)執(zhí)行完畢中······")
threadpool.shutdown(True) # 等待線程池中的任務(wù)執(zhí)行完畢后,在繼續(xù)執(zhí)行
print("END")

任務(wù)執(zhí)行完任務(wù),再干點其他事:
"""線程池的回調(diào)"""
import time
import random
from concurrent.futures import ThreadPoolExecutor
def task(video_url):
print("開始執(zhí)行任務(wù)", video_url)
time.sleep(1)
return random.randint(0, 10) # 將結(jié)果封裝成一個Futuer對象,返回給線程池
def done(response): # response就是futuer對象,也就是task的返回值分裝的一個Futuer對象
print("任務(wù)執(zhí)行完后,回調(diào)的函數(shù)", response.result()) # 即Futuer.result():取出task的返回值
# 創(chuàng)建線程池
threadpool = ThreadPoolExecutor(10)
url_list = ["www.xxxx-{}.com".format(i) for i in range(5)]
for url in url_list:
futuer = threadpool.submit(task, url) # futuer是由task返回的一個Future對象,里面有記錄task的返回值
futuer.add_done_callback(done) # 回調(diào)done函數(shù),執(zhí)行者依然是子線程
# 優(yōu)點:可以做分工,例如:task專門下載,done專門將下載的數(shù)據(jù)寫入本地文件。到此這篇關(guān)于詳解Python中的多線程的文章就介紹到這了,更多相關(guān)Python多線程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Pandas實現(xiàn)清洗客戶編碼異常數(shù)據(jù)
在不同行業(yè)中,我們經(jīng)常會遇到一個麻煩的問題:數(shù)據(jù)清洗,尤其是當(dāng)我們需要處理客戶編碼異常數(shù)據(jù)時,下面小編就來和大家分享一下常用的解決辦法吧2023-07-07
Python數(shù)據(jù)可視化之用Matplotlib繪制常用圖形
Matplotlib能夠繪制折線圖、散點圖、柱狀圖、直方圖、餅圖. 我們需要知道不同的統(tǒng)計圖的意義,以此來決定選擇哪種統(tǒng)計圖來呈現(xiàn)我們的數(shù)據(jù),今天就帶大家詳細(xì)了解如何繪制這些常用圖形,需要的朋友可以參考下2021-06-06
Python實現(xiàn)求最大公約數(shù)及判斷素數(shù)的方法
這篇文章主要介紹了Python實現(xiàn)求最大公約數(shù)及判斷素數(shù)的方法,涉及Python算數(shù)運算的相關(guān)技巧,需要的朋友可以參考下2015-05-05
Tensorflow 模型轉(zhuǎn)換 .pb convert to .lite實例
今天小編就為大家分享一篇Tensorflow 模型轉(zhuǎn)換 .pb convert to .lite實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-02-02
對DataFrame數(shù)據(jù)中的重復(fù)行,利用groupby累加合并的方法詳解
今天小編就為大家分享一篇對DataFrame數(shù)據(jù)中的重復(fù)行,利用groupby累加合并的方法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01
Python調(diào)用騰訊API實現(xiàn)人臉身份證比對功能
這篇文章主要介紹了Python調(diào)用騰訊API進(jìn)行人臉身份證比對,簡單介紹了調(diào)用騰訊云API步驟,通過完整代碼展示與結(jié)果,需要的朋友可以參考下2022-04-04

