簡單學(xué)習(xí)Python多進(jìn)程Multiprocessing
1.1 什么是 Multiprocessing
多線程在同一時(shí)間只能處理一個(gè)任務(wù)。
可把任務(wù)平均分配給每個(gè)核,而每個(gè)核具有自己的運(yùn)算空間。
1.2 添加進(jìn)程 Process
與線程類似,如下所示,但是該程序直接運(yùn)行無結(jié)果,因?yàn)镮DLE不支持多進(jìn)程,在命令行終端運(yùn)行才有結(jié)果顯示
import multiprocessing as mp
def job(a,b):
print('abc')
if __name__=='__main__':
p1=mp.Process(target=job,args=(1,2))
p1.start()
p1.join()
1.3 存儲進(jìn)程輸出 Queue
不知道為什么下面的這個(gè)程序可以在IDLE中正常運(yùn)行。首先定義了一個(gè)job函數(shù)作系列數(shù)學(xué)運(yùn)算,然后將結(jié)果放到res中,在main函數(shù)運(yùn)行,取出queue中存儲的結(jié)果再進(jìn)行一次加法運(yùn)算。
import multiprocessing as mp def job(q): res=0 for i in range(1000): res+=i+i**2+i**3 q.put(res) if __name__ == '__main__': q=mp.Queue() p1 = mp.Process(target=job,args=(q,))#注意當(dāng)參數(shù)只有一個(gè)時(shí),應(yīng)加上逗號 p2 = mp.Process(target=job,args=(q,)) p1.start() p2.start() p1.join() p2.join() res1=q.get() res2=q.get() print(res1+res2)
結(jié)果如下所示:

1.4 效率比對 threading & multiprocessing
在job函數(shù)中定義了數(shù)學(xué)運(yùn)算,比較正常情況、多線程和多進(jìn)程分別的運(yùn)行時(shí)間。
import multiprocessing as mp
import threading as td
import time
def job(q):
res = 0
for i in range(10000000):
res += i+i**2+i**3
q.put(res) # queue
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:' , res1+res2)
def normal():
res = 0
for _ in range(2):#線程或進(jìn)程都構(gòu)造了兩個(gè),進(jìn)行了兩次運(yùn)算,所以這里循環(huán)兩次
for i in range(10000000):
res += i+i**2+i**3
print('normal:', res)
def multithread():
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:', res1+res2)
if __name__ == '__main__':
st = time.time()
normal()
st1= time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicore time:', time.time()-st2)
在視頻中的運(yùn)行結(jié)果是多進(jìn)程<正常<多線程,而我的運(yùn)行結(jié)果為下圖所示:

綜上,多核/多進(jìn)程運(yùn)行最快,說明在同時(shí)間運(yùn)行了多個(gè)任務(wù),而多線程卻不一定會比正常情況下的運(yùn)行來的快,這和多線程中的GIL有關(guān)。
1.5 進(jìn)程池
進(jìn)程池Pool,就是我們將所要運(yùn)行的東西,放到池子里,Python會自行解決多進(jìn)程的問題。
import multiprocessing as mp def job(x): return x*x def multicore(): pool=mp.Pool(processes=2)#定義一個(gè)Pool,并定義CPU核數(shù)量為2 res=pool.map(job,range(10)) print(res) res=pool.apply_async(job,(2,)) print(res.get()) multi_res=[pool.apply_async(job,(i,)) for i in range(10)] print([res.get()for res in multi_res]) if __name__=='__main__': multicore()
運(yùn)行結(jié)果如下所示:

首先定義一個(gè)池子,有了池子之后,就可以讓池子對應(yīng)某一個(gè)函數(shù),在上述代碼中定義的pool對應(yīng)job函數(shù)。我們向池子里丟數(shù)據(jù),池子就會返回函數(shù)返回的值。 Pool和之前的Process的不同點(diǎn)是丟向Pool的函數(shù)有返回值,而Process的沒有返回值。
接下來用map()獲取結(jié)果,在map()中需要放入函數(shù)和需要迭代運(yùn)算的值,然后它會自動分配給CPU核,返回結(jié)果

我們怎么知道Pool是否真的調(diào)用了多個(gè)核呢?我們可以把迭代次數(shù)增大些,然后打開CPU負(fù)載看下CPU運(yùn)行情況
打開CPU負(fù)載(Mac):活動監(jiān)視器 > CPU > CPU負(fù)載(單擊一下即可)
Pool默認(rèn)大小是CPU的核數(shù),我們也可以通過在Pool中傳入processes參數(shù)即可自定義需要的核數(shù)量。
Pool除了可以用map來返回結(jié)果之外,還可以用apply_async(),與map不同的是,只能傳遞一個(gè)值,只會放入一個(gè)核進(jìn)行計(jì)算,但是傳入值時(shí)要注意是可迭代的,所以在傳入值后需要加逗號, 同時(shí)需要用get()方法獲取返回值。所對應(yīng)的代碼為:
res=pool.apply_async(job,(2,)) print(res.get())
運(yùn)行結(jié)果為4。
由于傳入值是可以迭代的,則我們同樣可以使用apply_async()來輸出多個(gè)結(jié)果。如果在apply_async()中輸入多個(gè)傳入值:
res = pool.apply_async(job, (2,3,4,))
結(jié)果會報(bào)錯(cuò):
TypeError: job() takes exactly 1 argument (3 given)
即apply_async()只能輸入一組參數(shù)。
在此我們將apply_async()放入迭代器中,定義一個(gè)新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
同樣在取出值時(shí)需要一個(gè)一個(gè)取出來
print([res.get() for res in multi_res])
apply用迭代器的運(yùn)行結(jié)果與map取出的結(jié)果相同。
note:
(1)Pool默認(rèn)調(diào)用是CPU的核數(shù),傳入processes參數(shù)可自定義CPU核數(shù)
(2)map() 放入迭代參數(shù),返回多個(gè)結(jié)果
(3)apply_async()只能放入一組參數(shù),并返回一個(gè)結(jié)果,如果想得到map()的效果需要通過迭代
1.6 共享內(nèi)存 shared memory
只有通過共享內(nèi)存才能讓CPU之間進(jìn)行交流。
通過Value將數(shù)據(jù)存儲在一個(gè)共享的內(nèi)存表中。
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
其中,i和d表示數(shù)據(jù)類型。i為帶符號的整型,d為雙精浮點(diǎn)類型。更多數(shù)據(jù)類型可參考網(wǎng)址:https://docs.python.org/3/library/array.html
在多進(jìn)程中有一個(gè)Array類,可以和共享內(nèi)存交互,來實(shí)現(xiàn)進(jìn)程之間共享數(shù)據(jù)。
和numpy中的不同,這里的Array只能是一維的,并且需要定義數(shù)據(jù)類型否則會報(bào)錯(cuò)。
array = mp.Array('i', [1, 2, 3, 4])
1.7 進(jìn)程鎖 Lock
首先是不加進(jìn)程鎖的運(yùn)行情況,在下述代碼中定義了共享變量v,定義了兩個(gè)進(jìn)程,均可對v進(jìn)行操作。job函數(shù)的作用是每隔0.1s輸出一次累加num的值,累加值num在兩個(gè)進(jìn)程中分別為1和3。
import multiprocessing as mp
import time
def job(v,num):
for _ in range(10):
time.sleep(0.1)#暫停0.1s,讓輸出效果更明顯
v.value+=num #v.value獲取共享變量值
print(v.value)
def multicore():
v=mp.Value('i',0)#定義共享變量
p1=mp.Process(target=job,args=(v,1))
p2=mp.Process(target=job,args=(v,3))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__=='__main__':
multicore()
運(yùn)行結(jié)果如下所示:

可以看到兩個(gè)進(jìn)程互相搶占共享內(nèi)存v。
為了解決上述不同進(jìn)程搶共享資源的問題,我們可以用加進(jìn)程鎖來解決。
首先需要定義一個(gè)進(jìn)程鎖:
l = mp.Lock() # 定義一個(gè)進(jìn)程鎖
然后將進(jìn)程鎖的信息傳入各個(gè)進(jìn)程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要將Lock傳入 p2 = mp.Process(target=job, args=(v,3,l))
在job()中設(shè)置進(jìn)程鎖的使用,保證運(yùn)行時(shí)一個(gè)進(jìn)程的對鎖內(nèi)內(nèi)容的獨(dú)占
def job(v, num, l): l.acquire() # 鎖住 for _ in range(5): time.sleep(0.1) v.value += num # v.value獲取共享內(nèi)存 print(v.value) l.release() # 釋放
完整代碼:
def job(v, num, l):
l.acquire() # 鎖住
for _ in range(5):
time.sleep(0.1)
v.value += num # 獲取共享內(nèi)存
print(v.value)
l.release() # 釋放
def multicore():
l = mp.Lock() # 定義一個(gè)進(jìn)程鎖
v = mp.Value('i', 0) # 定義共享內(nèi)存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
運(yùn)行結(jié)果如下所示:

可以看到進(jìn)程1運(yùn)行完之后才運(yùn)行進(jìn)程2。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Python多進(jìn)程并發(fā)(multiprocessing)用法實(shí)例詳解
- Python3多進(jìn)程 multiprocessing 模塊實(shí)例詳解
- Python多進(jìn)程multiprocessing用法實(shí)例分析
- python multiprocessing多進(jìn)程變量共享與加鎖的實(shí)現(xiàn)
- Python標(biāo)準(zhǔn)庫之多進(jìn)程(multiprocessing包)介紹
- python基于multiprocessing的多進(jìn)程創(chuàng)建方法
- python multiprocessing 多進(jìn)程并行計(jì)算的操作
- Python使用multiprocessing實(shí)現(xiàn)多進(jìn)程的詳細(xì)步驟記錄
相關(guān)文章
基于Python實(shí)現(xiàn)自動化文檔整理工具
一個(gè)人可能會在計(jì)算機(jī)上存儲大量的照片、視頻和文檔文件,這些文件可能散落在不同的文件夾中,難以管理和查找。所以本文就來用Python制作一個(gè)自動化文檔整理工具吧2023-04-04
基于Python實(shí)現(xiàn)原創(chuàng)程序猿乘風(fēng)破浪小游戲
最近學(xué)習(xí)了一丁點(diǎn)Pygame技能,感覺有點(diǎn)上頭,一波操作創(chuàng)作“程序猿乘風(fēng)破浪”游戲一款,文中的示例代碼講解詳細(xì),希望大家能夠喜歡2023-02-02
python人工智能tensorflow函數(shù)np.random模塊使用
這篇文章主要為大家介紹了python人工智能tensorflow函數(shù)np.random模塊使用方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Python使用psycopg2連接PostgreSQL數(shù)據(jù)庫的步驟
PostgreSQL 是一個(gè)廣泛使用的開源對象關(guān)系數(shù)據(jù)庫系統(tǒng),以其強(qiáng)大的功能和靈活性而聞名,Python,作為一種流行的編程語言,提供了多種方式與數(shù)據(jù)庫交互,其中 psycopg2 是連接 PostgreSQL 數(shù)據(jù)庫的流行選擇之一,本文介紹了Python使用psycopg2連接PostgreSQL數(shù)據(jù)庫的步驟2024-12-12

