python基礎(chǔ)之并發(fā)編程(二)
一、多進(jìn)程的實(shí)現(xiàn)
方法一
# 方法包裝 多進(jìn)程
from multiprocessing import Process
from time import sleep
def func1(arg):
print(f'{arg}開始...')
sleep(2)
print(f'{arg}結(jié)束...')
if __name__ == "__main__":
p1 = Process(target=func1,args=('p1',))
p2 = Process(target=func1,args=('p2',))
p1.start()
p2.start()
方法二:
二、使用進(jìn)程的優(yōu)缺點(diǎn)
1、優(yōu)點(diǎn)
- 可以使用計算機(jī)多核,進(jìn)行任務(wù)的并發(fā)執(zhí)行,提高執(zhí)行效率
- 運(yùn)行不受其他進(jìn)程影響,創(chuàng)建方便
- 空間獨(dú)立,數(shù)據(jù)安全
2、缺點(diǎn)
- 進(jìn)程的創(chuàng)建和刪除消耗的系統(tǒng)資源較多
三、進(jìn)程的通信
Python 提供了多種實(shí)現(xiàn)進(jìn)程間通信的機(jī)制,主要有以下 2 種:
1. Python multiprocessing 模塊下的 Queue 類,提供了多個進(jìn)程之間實(shí)現(xiàn)通信的諸多 方法
2. Pipe,又被稱為“管道”,常用于實(shí)現(xiàn) 2 個進(jìn)程之間的通信,這 2 個進(jìn)程分別位于管 道的兩端
Pipe 直譯過來的意思是“管”或“管道”,該種實(shí)現(xiàn)多進(jìn)程編程的方式,和實(shí)際生活中 的管(管道)是非常類似的。通常情況下,管道有 2 個口,而 Pipe 也常用來實(shí)現(xiàn) 2 個進(jìn)程之 間的通信,這 2 個進(jìn)程分別位于管道的兩端,一端用來發(fā)送數(shù)據(jù),另一端用來接收數(shù)據(jù) - send(obj)
發(fā)送一個 obj 給管道的另一端,另一端使用 recv() 方法接收。需要說明的是,該 obj 必 須是可序列化的,如果該對象序列化之后超過 32MB,則很可能會引發(fā) ValueError 異常 - recv()
接收另一端通過 send() 方法發(fā)送過來的數(shù)據(jù) - close()
關(guān)閉連接 - poll([timeout])
返回連接中是否還有數(shù)據(jù)可以讀取 - end_bytes(buffer[, offset[, size]])
發(fā)送字節(jié)數(shù)據(jù)。如果沒有指定 offset、size 參數(shù),則默認(rèn)發(fā)送 buffer 字節(jié)串的全部數(shù) 據(jù);如果指定了 offset 和 size 參數(shù),則只發(fā)送 buffer 字節(jié)串中從 offset 開始、長度為 size 的字節(jié)數(shù)據(jù)。通過該方法發(fā)送的數(shù)據(jù),應(yīng)該使用 recv_bytes() 或 recv_bytes_into 方法接收 - recv_bytes([maxlength])
接收通過 send_bytes() 方法發(fā)送的數(shù)據(jù),maxlength 指定最多接收的字節(jié)數(shù)。該方法返 回接收到的字節(jié)數(shù)據(jù) - recv_bytes_into(buffer[, offset])
功能與 recv_bytes() 方法類似,只是該方法將接收到的數(shù)據(jù)放在 buffer 中
1、Queue 實(shí)現(xiàn)進(jìn)程間通信
from multiprocessing import Process,current_process,Queue # current_process 指的是當(dāng)前進(jìn)程
# from queue import Queue
import os
def func(name,mq):
print('進(jìn)程ID {} 獲取了數(shù)據(jù):{}'.format(os.getpid(),mq.get()))
mq.put('shiyi')
if __name__ == "__main__":
# print('進(jìn)程ID:{}'.format(current_process().pid))
# print('進(jìn)程ID:{}'.format(os.getpid()))
mq = Queue()
mq.put('yangyang')
p1 = Process(target=func,args=('p1',mq))
p1.start()
p1.join()
print(mq.get())
2、Pipe 實(shí)現(xiàn)進(jìn)程間通信(一邊發(fā)送send(obj),一邊接收(obj))
from multiprocessing import Process,current_process,Pipe
import os
def func(name,con):
print('進(jìn)程ID {} 獲取了數(shù)據(jù):{}'.format(os.getpid(),con.recv()))
con.send('你好!')
if __name__ == "__main__":
# print('進(jìn)程ID:{}'.format(current_process().pid))
con1,con2 = Pipe()
p1 = Process(target=func,args=('p1',con1))
p1.start()
con2.send("hello!")
p1.join()
print(con2.recv())
四、Manager管理器
管理器提供了一種創(chuàng)建共享數(shù)據(jù)的方法,從而可以在不同進(jìn)程中共享
from multiprocessing import Process,current_process
import os
from multiprocessing import Manager
def func(name,m_list,m_dict):
print('子進(jìn)程ID {} 獲取了數(shù)據(jù):{}'.format(os.getpid(),m_list))
print('子進(jìn)程ID {} 獲取了數(shù)據(jù):{}'.format(os.getpid(),m_dict))
m_list.append('你好')
m_dict['name'] = 'shiyi'
if __name__ == "__main__":
print('主進(jìn)程ID:{}'.format(current_process().pid))
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append('Hello!!')
p1 = Process(target=func,args=('p1',m_list,m_dict))
p1.start()
p1.join()
print(m_list)
print(m_dict)
五、進(jìn)程池
Python 提供了更好的管理多個進(jìn)程的方式,就是使用進(jìn)程池。
進(jìn)程池可以提供指定數(shù)量的進(jìn)程給用戶使用,即當(dāng)有新的請求提交到進(jìn)程池中時,如果池 未滿,則會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;反之,如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大 值,那么該請求就會等待,只要池中有進(jìn)程空閑下來,該請求就能得到執(zhí)行。
使用進(jìn)程池的優(yōu)點(diǎn)
1. 提高效率,節(jié)省開辟進(jìn)程和開辟內(nèi)存空間的時間及銷毀進(jìn)程的時間
2. 節(jié)省內(nèi)存空間
| 類/方法 | 功能 | 參數(shù) |
|
Pool(processes)
|
創(chuàng)建進(jìn)程池對象
|
processes 表示進(jìn)程池
中有多少進(jìn)程
|
|
pool.apply_async(func,a
rgs,kwds)
|
異步執(zhí)行 ;將事件放入到進(jìn) 程池隊列
|
func 事件函數(shù)
args 以元組形式給
func 傳參
kwds 以字典形式給
func 傳參 返回值:返
回一個代表進(jìn)程池事件的對
象,通過返回值的 get 方法
可以得到事件函數(shù)的返回值
|
|
pool.apply(func,args,kw
ds)
|
同步執(zhí)行;將事件放入到進(jìn)程 池隊列
|
func 事件函數(shù) args 以
元組形式給 func 傳參
kwds 以字典形式給 func
傳參
|
|
pool.close()
|
關(guān)閉進(jìn)程池
|
|
|
pool.join()
|
回收進(jìn)程池
|
|
|
pool.map(func,iter)
|
類似于 python 的 map 函
數(shù),將要做的事件放入進(jìn)程池
|
func 要執(zhí)行的函數(shù)
iter 迭代對象
|
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"當(dāng)前進(jìn)程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print(args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func = func1,args=('t1',),callback=func2)
pool.apply_async(func = func1,args=('t2',),callback=func2)
pool.apply_async(func = func1,args=('t3',),callback=func2)
pool.apply_async(func = func1,args=('t4',))
pool.apply_async(func = func1,args=('t5',))
pool.apply_async(func = func1,args=('t6',))
pool.close()
pool.join()
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"當(dāng)前進(jìn)程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1,('t1,','t2,','t3,','t4,','t5,','t6,','t7,','t8,'))
for a in args:
print(a)
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
python中24小時制轉(zhuǎn)換為12小時制的方法
最近需要實(shí)現(xiàn)一個需求,求用戶輸入24小時制的時間,然后顯示12小時制的時間。具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-06-06
python使用matplotlib:subplot繪制多個子圖的示例
這篇文章主要介紹了python使用matplotlib:subplot繪制多個子圖的示例,幫助大家更好的利用python繪制圖像,感興趣的朋友可以了解下2020-09-09
Python基于多線程操作數(shù)據(jù)庫相關(guān)問題分析
這篇文章主要介紹了Python基于多線程操作數(shù)據(jù)庫相關(guān)問題,結(jié)合實(shí)例形式分析了Python使用數(shù)據(jù)庫連接池并發(fā)操作數(shù)據(jù)庫避免超時、連接丟失相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2018-07-07

