Python中的進程操作模塊(multiprocess.process)
一、multiprocess模塊
multiprocess不是一個模塊而是python中一個操作、管理進程的包。
子模塊分為四個部分:
- 創(chuàng)建進程部分(multiprocess.process)
- 進程同步部分((multiprocess.Lock))
- 進程池部分((multiprocess.Pool))
- 進程之間數(shù)據(jù)共享(ThreadLocal、multiprocess.Queue、Pipes)
二、multiprocess.process模塊
process模塊是一個創(chuàng)建進程的模塊,借助這個模塊,就可以完成進程的創(chuàng)建。
在windows中使用process模塊的注意事項
在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進程的機制),在創(chuàng)建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執(zhí)行了整個文件。
因此如果將process()直接寫在文件中就會無限遞歸創(chuàng)建子進程報錯。所以必須把創(chuàng)建子進程的部分使用if \_\_name\_\_ =='\_\_main\_\_' 判斷保護起來,import 的時候,就不會遞歸運行了。
1、使用process模塊創(chuàng)建進程
在一個python進程中開啟子進程,start方法和并發(fā)效果。
1 在Python中啟動的第一個子進程
import time
from multiprocessing import Process
def f(name):
print('hello', name)
time.sleep(1)
print('我是子進程')
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
# p.join()
print('我是父進程')2、 查看主進程和子進程的進程號
import os
from multiprocessing import Process
def f(x):
print('子進程id :',os.getpid(),'父進程id :',os.getppid())
return x*x
if __name__ == '__main__':
print('主進程id :', os.getpid())
p_lst = []
for i in range(5):
p = Process(target=f, args=(i,))
p.start()3、 進階,多個進程同時運行
注意,子進程的執(zhí)行順序不是根據(jù)啟動順序決定的。
import time
from multiprocessing import Process
def f(name):
print('hello', name)
time.sleep(1)
if __name__ == '__main__':
p_lst = []
for i in range(5):
p = Process(target=f, args=('bob',))
p.start()
p_lst.append(p)
p.join()
# [p.join() for p in p_lst]
print('父進程在執(zhí)行')4、 通過繼承Process類開啟進程
import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print(os.getpid())
print('%s 正在和女主播聊天' %self.name)
p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')
p1.start() # start會自動調用run
p2.start()
# p2.run()
p3.start()
p1.join()
p2.join()
p3.join()
print('主線程')5、 進程之間的數(shù)據(jù)隔離問題
from multiprocessing import Process
def work():
global n
n=0
print('子進程內(nèi): ',n)
if __name__ == '__main__':
n = 100
p=Process(target=work)
p.start()
print('主進程內(nèi): ',n)2、守護進程daemon
會隨著主進程的結束而結束。
主進程創(chuàng)建守護進程
其一:守護進程會在主進程代碼執(zhí)行結束后就終止
其二:守護進程內(nèi)無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。
1、 守護進程的啟動
import os
import time
from multiprocessing import Process
class Myprocess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(),self.name)
print('%s正在和女主播聊天' %self.person)
p=Myprocess('哪吒')
p.daemon=True # 一定要在p.start()前設置,設置p為守護進程,禁止p創(chuàng)建子進程,并且父進程代碼執(zhí)行結束,p即終止運行
p.start()
time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('主')2、 主進程代碼執(zhí)行結束守護進程立即結束
from multiprocessing import Process
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
p1=Process(target=foo)
p2=Process(target=bar)
p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------") # 打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執(zhí)行的打印信息123,因為主進程打印main----時,p1也執(zhí)行了,但是隨即被終止,p2可以打印出來。3、socket聊天并發(fā)實例
from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__': # windows下start進程一定要寫到這下面
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))4、進程對象的其他方法:terminate和is_alive
from multiprocessing import Process
import time
import random
class Myprocess(Process):
def __init__(self,person):
self.name=person
super().__init__()
def run(self):
print('%s正在和網(wǎng)紅臉聊天' %self.name)
time.sleep(random.randrange(1,5))
print('%s還在和網(wǎng)紅臉聊天' %self.name)
p1=Myprocess('哪吒')
p1.start()
p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True
print('開始')
print(p1.is_alive()) #結果為False5、進程對象的其他屬性:pid和name
class Myprocess(Process):
def __init__(self,person):
self.name=person # name屬性是Process中的屬性,標示進程的名字
super().__init__() # 執(zhí)行父類的初始化方法會覆蓋name屬性
# self.name = person # 在這里設置就可以修改進程名字了
# self.person = person # 如果不想覆蓋進程名,就修改屬性名稱就可以了
def run(self):
print('%s正在和網(wǎng)紅臉聊天' %self.name)
# print('%s正在和網(wǎng)紅臉聊天' %self.person)
time.sleep(random.randrange(1,5))
print('%s正在和網(wǎng)紅臉聊天' %self.name)
# print('%s正在和網(wǎng)紅臉聊天' %self.person)
p1=Myprocess('哪吒')
p1.start()
print(p1.pid) #可以查看子進程的進程id6、參考:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
強調:
1.需要使用關鍵字的方式來指定參數(shù)
2.args指定的為傳給target函數(shù)的位置參數(shù),是一個元組形式,必須有逗號
參數(shù)介紹:
•group參數(shù)未使用,值始終為None
•target表示調用對象,即子進程要執(zhí)行的任務
•args表示調用對象的位置參數(shù)元組,args=(1,2,'egon',)
•kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
•name為子進程的名稱
1、 方法介紹
•p.start():啟動進程,并調用該子進程中的p.run()
•p.run():進程啟動時運行的方法,正是它去調用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法
•p.terminate():強制終止進程p,不會進行任何清理操作,如果p創(chuàng)建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
•p.is_alive():如果p仍然運行,返回True
•p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態(tài),而p是處于運行的狀態(tài))。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
2、 屬性介紹
•p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創(chuàng)建自己的新進程,必須在p.start()之前設置
•p.name:進程的名稱
•p.pid:進程的pid
•p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
•p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
二、進程同步(multiprocess.Lock)
當多個進程使用同一份數(shù)據(jù)資源的時候,就會引發(fā)數(shù)據(jù)安全或順序混亂問題。
1、多進程搶占輸出資源
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s:%s is done' % (n, os.getpid()))
if __name__ == '__main__':
for i in range(3):
p = Process(target=work, args=(i,))
p.start()
# 0: 15620 is running
# 1: 19688 is running
# 2: 15892 is running
# 1:19688 is done
# 0:15620 is done
# 2:15892 is done2、使用鎖維護執(zhí)行順序
由并發(fā)變成了串行,犧牲了運行效率,但避免了競爭,確實會浪費了時間,卻保證了數(shù)據(jù)的安全。
import os
import time
import random
from multiprocessing import Process,Lock
def work(lock,n):
lock.acquire()
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,i))
p.start()
# 1: 24776 is running
# 1: 24776 is done
# 0: 23588 is running
# 0: 23588 is done
# 2: 27308 is running
# 2: 27308 is done3、多進程同時搶購余票
# 文件db的內(nèi)容為:{"count":5}
# 注意一定要用雙引號,不然json無法識別
# 并發(fā)運行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂
from multiprocessing import Process, Lock
import time, json, random
def search():
dic = json.load(open('db'))
print('剩余票數(shù)%s' % dic['count'])
def get():
dic = json.load(open('db'))
time.sleep(random.random()) # 模擬讀數(shù)據(jù)的網(wǎng)絡延遲
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.random()) # 模擬寫數(shù)據(jù)的網(wǎng)絡延遲
json.dump(dic, open('db', 'w'))
print('購票成功')
else:
print('購票失敗')
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10): # 模擬并發(fā)10個客戶端搶票
p = Process(target=task, args=(lock,))
p.start()雖然可以用文件共享數(shù)據(jù)實現(xiàn)進程間通信,但問題是:
- 效率低(共享數(shù)據(jù)基于文件,而文件是硬盤上的數(shù)據(jù))
- 需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
- 效率高(多個進程共享一塊內(nèi)存的數(shù)據(jù))
- 幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。
隊列和管道都是將數(shù)據(jù)存放于內(nèi)存中,隊列又是基于(管道+鎖)實現(xiàn)的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數(shù)目增多時,往往可以獲得更好的可獲展性。
三、進程間通信IPC(Inter-Process Communication) (multiprocess.Queue)
1、 概念介紹——隊列multiprocess.Queue
創(chuàng)建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現(xiàn)多進程之間的數(shù)據(jù)傳遞。
Queue([maxsize])創(chuàng)建共享的進程隊列。
參數(shù) :maxsize是隊列中允許的最大項數(shù)。如果省略此參數(shù),則無大小限制。
底層隊列使用管道和鎖定實現(xiàn)。另外,還需要運行支持線程以便隊列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?/p>
2、 方法介紹
q.get( [ block [ ,timeout ] ] ):返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認為True. 如果設置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內(nèi)沒有項目變?yōu)榭捎?,將引發(fā)Queue.Empty異常。q.get_nowait():同q.get(False)方法。q.put(item [, block [,timeout ] ] ):將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發(fā)Queue.Full異常。q.qsize():返回隊列中目前項目的正確數(shù)量。此函數(shù)的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。q.empty():如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經(jīng)加入新的項目。q.full():如果q已滿,返回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。q.close():關閉隊列,防止隊列中加入更多數(shù)據(jù)。調用此方法時,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù),但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產(chǎn)者中的隊列不會導致get()方法返回錯誤。q.cancel_join_thread():不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創(chuàng)建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
3、代碼實例——multiprocess.Queue
1、 單看隊列用法
這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
'''
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基于消息傳遞實現(xiàn)的,但是隊列接口
'''
from multiprocessing import Queue
q = Queue(3)
# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(2)
q.put(1)
# q.put(3) # 如果隊列已經(jīng)滿了,程序就會停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊列。
# 如果隊列中的數(shù)據(jù)一直不被取走,程序就會永遠停在這里。
try:
q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
print('隊列已經(jīng)滿了')
# 因此,我們再放入數(shù)據(jù)之前,可以先看一下隊列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。
print(q.full()) # True
print(q.get()) # 3
print(q.get()) # 2
print(q.get()) # 1
# print(q.get()) # 同put方法一樣,如果隊列已經(jīng)空了,那么繼續(xù)取就會出現(xiàn)阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
print('隊列已經(jīng)空了')
print(q.empty()) # True,空了2、 子進程發(fā)送數(shù)據(jù)給父進程
一個queue的簡單應用,使用隊列q對象調用get函數(shù)來取得隊列中最先進入的數(shù)據(jù)。
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from Eva', 'hello']) # 調用主函數(shù)中p進程傳遞過來的進程參數(shù) put函數(shù)為向隊列中添加一條數(shù)據(jù)。
if __name__ == '__main__':
q = Queue() # 創(chuàng)建一個Queue對象
p = Process(target=f, args=(q,)) # 創(chuàng)建一個進程
p.start()
print(q.get())
p.join()
# ['Mon Dec 9 18:27:08 2019', 'from Eva', 'hello']3、 批量生產(chǎn)數(shù)據(jù)放入隊列再批量獲取結果
import os
import time
import multiprocessing
# 向queue中輸入數(shù)據(jù)的函數(shù)
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
# 向queue中輸出數(shù)據(jù)的函數(shù)
def outputQ(queue):
info= queue.get()
print('%s%s%s' % (str(os.getpid()), '(get):', info))
# Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] # store input processes
record2 = [] # store output processes
queue = multiprocessing.Queue(3)
# 輸入進程
for i in range(10):
process = multiprocessing.Process(target=inputQ, args=(queue,))
process.start()
record1.append(process)
# 輸出進程
for i in range(10):
process = multiprocessing.Process(target=outputQ, args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
# 17568(get):3208(put):Mon Dec 9 18:29:17 2019
# 27620(get):24024(put):Mon Dec 9 18:29:17 2019
# 19780(get):21716(put):Mon Dec 9 18:29:17 2019
# 27576(get):27608(put):Mon Dec 9 18:29:17 2019
# 11304(get):10668(put):Mon Dec 9 18:29:18 2019
# 19732(get):20548(put):Mon Dec 9 18:29:18 2019
# 18120(get):25360(put):Mon Dec 9 18:29:18 2019
# 24752(get):21764(put):Mon Dec 9 18:29:18 2019
# 19848(get):7604(put):Mon Dec 9 18:29:18 2019
# 13888(get):10376(put):Mon Dec 9 18:29:18 20194、生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
1、 為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
2、 什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
3、 基于隊列實現(xiàn)生產(chǎn)者消費者模型
import os
import random
import time
from multiprocessing import Process, Queue
def producer(q):
for i in range(10):
time.sleep(random.randint(1, 3))
res = '包子%s' % i
q.put(res)
print('%s 生產(chǎn)了 %s' % (os.getpid(), res))
def consumer(q):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (os.getpid(), res))
if __name__ == '__main__':
q = Queue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=(q,))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
# 開始
p1.start()
c1.start()
print('主')此時的問題是主進程永遠不會結束,原因是:生產(chǎn)者p在生產(chǎn)完后就結束了,但是消費者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊列中再發(fā)一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環(huán)。
4、 改良版——生產(chǎn)者消費者模型
注意:結束信號None,不一定要由生產(chǎn)者發(fā),主進程里同樣可以發(fā),但主進程需要等生產(chǎn)者結束后才應該發(fā)送該信號。
from multiprocessing import Process, Queue
import time, random, os
def producer(q):
for i in range(10):
time.sleep(random.randint(1, 3))
res = '包子%s' % i
q.put(res)
print('%s 生產(chǎn)了 %s' % (os.getpid(), res))
q.put(None)# 發(fā)送結束信號
def consumer(q):
while True:
res = q.get()
if res is None: break # 收到結束信號則結束
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (os.getpid(), res))
if __name__ == '__main__':
q = Queue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=(q,))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
# 開始
p1.start()
c1.start()
print('主')5、 主進程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結束信號None
from multiprocessing import Process, Queue
import time, random, os
def producer(q):
for i in range(2):
time.sleep(random.randint(1, 3))
res = '包子%s' % i
q.put(res)
print('%s 生產(chǎn)了 %s' % (os.getpid(), res))
def consumer(q):
while True:
res = q.get()
if res is None: break # 收到結束信號則結束
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (os.getpid(), res))
if __name__ == '__main__':
q = Queue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=(q,))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
# 開始
p1.start()
c1.start()
p1.join()
q.put(None)# 發(fā)送結束信號
print('主')但上述解決方式,在有多個生產(chǎn)者和多個消費者時,我們則需要用一個很low的方式去解決
6、 多個消費者的例子:有幾個消費者就需要發(fā)送幾次結束信號
from multiprocessing import Process, Queue
import time, random, os
def producer(name, q):
for i in range(2):
time.sleep(random.randint(1, 3))
res = '%s%s' % (name, i)
q.put(res)
print('%s 生產(chǎn)了 %s' % (os.getpid(), res))
def consumer(q):
while True:
res = q.get()
if res is None: break # 收到結束信號則結束
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (os.getpid(), res))
if __name__ == '__main__':
q = Queue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=('包子', q))
p2 = Process(target=producer, args=('骨頭', q))
p3 = Process(target=producer, args=('泔水', q))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
# 開始
p1.start()
p2.start()
p3.start()
c1.start()
p1.join() # 必須保證生產(chǎn)者全部生產(chǎn)完畢,才應該發(fā)送結束信號
p2.join()
p3.join()
q.put(None) # 有幾個消費者就應該發(fā)送幾次結束信號None
q.put(None) # 發(fā)送結束信號
print('主')5、JoinableQueue([maxsize])可連接的共享進程隊列
創(chuàng)建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產(chǎn)者項目已經(jīng)被成功處理。通知進程是使用共享的信號和條件變量來實現(xiàn)的。
1、 方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done():使用者使用此方法發(fā)出信號,表示q.get()返回的項目已經(jīng)被處理。如果調用此方法的次數(shù)大于從隊列中刪除的項目數(shù)量,將引發(fā)ValueError異常。q.join():生產(chǎn)者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續(xù)到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產(chǎn)者將項目放入隊列,并等待它們被處理。
2、 JoinableQueue隊列實現(xiàn)消費之生產(chǎn)者模型
from multiprocessing import Process, JoinableQueue
import time, random, os
def producer(name, q):
for i in range(10):
time.sleep(random.randint(1, 3))
res = '%s%s' % (name, i)
q.put(res)
print('%s 生產(chǎn)了 %s' % (os.getpid(), res))
q.join() # 生產(chǎn)完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。
def consumer(q):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('%s 吃 %s' % (os.getpid(), res))
q.task_done() # 向q.join()發(fā)送一次信號,證明一個數(shù)據(jù)已經(jīng)被取走了
if __name__ == '__main__':
q = JoinableQueue()
# 生產(chǎn)者們:即廚師們
p1 = Process(target=producer, args=('包子', q))
p2 = Process(target=producer, args=('骨頭', q))
p3 = Process(target=producer, args=('泔水', q))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.daemon = True
c2.daemon = True
# 開始
p_l = [p1, p2, p3, c1, c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')
# 主進程等--->p1,p2,p3等---->c1,c2
# p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊列的數(shù)據(jù)
# 因而c1,c2也沒有存在的價值了,不需要繼續(xù)阻塞在進程中影響主進程了。應該隨著主進程的結束而結束,所以設置成守護進程就可以了。四、進程池(multiprocess.Pool)
1、概念介紹——multiprocess.Pool
Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進程池
- numprocess:要創(chuàng)建的進程數(shù),如果省略,將默認使用
cpu_count()的值 - initializer:是每個工作進程啟動時要執(zhí)行的可調用對象,默認為None
- initargs:是要傳給initializer的參數(shù)組
2、主要方法
p.apply(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結果。需要強調的是:此操作并不會在所有池工作進程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調用p.apply()函數(shù)或者使用p.apply_async()p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執(zhí)行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入?yún)?shù)。當func的結果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結果。p.close():關閉進程池,防止進一步操作。如果所有操作持續(xù)掛起,它們將在工作進程終止前完成P.join():等待所有工作進程退出。此方法只能在close()或teminate()之后調用obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內(nèi)還沒有到達,將引發(fā)一場。如果遠程操作中引發(fā)了異常,它將在調用此方法時再次被引發(fā)。obj.ready():如果調用完成,返回Trueobj.successful():如果調用完成且沒有引發(fā)異常,返回True,如果在結果就緒之前調用此方法,引發(fā)異常obj.wait([timeout]):等待結果變?yōu)榭捎谩?/li>obj.terminate():立即終止所有工作進程,同時不執(zhí)行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數(shù)。
3、代碼實例——multiprocess.Pool
1、 同步
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3)
#進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,))
# 同步調用,直到本次任務執(zhí)行完畢拿到res,等待任務work執(zhí)行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務是否存在阻塞,同步調用都會在原地等著
print(res_l)2、 異步
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' % os.getpid())
time.sleep(random.random())
return n ** 2
if __name__ == '__main__':
p = Pool(3) # 進程池中從無到有創(chuàng)建三個進程,以后一直是這三個進程在執(zhí)行任務
res_l = []
for i in range(10):
res= p.apply_async(work, args=(i,))
# 異步運行,根據(jù)進程池中有的進程數(shù),每次最多3個子進程在異步執(zhí)行
# 返回結果之后,將結果放入列表,歸還進程,之后再執(zhí)行新的任務
# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束
# 而是執(zhí)行完一個就釋放一個進程,這個進程就去接收新的任務。
res_l.append(res)
# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用join,等待進程池內(nèi)任務都處理完,然后可以用get收集結果
# 否則,主進程結束,進程池可能還沒來得及執(zhí)行,也就跟著一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()
) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執(zhí)行,立刻獲取結果,也根本無需get4、進程池版socket并發(fā)聊天練習
1、 server
#Pool內(nèi)的進程數(shù)默認是cpu核數(shù),假設為4(查看方法os.cpu_count())
#開啟6個客戶端,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)
#在每個進程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn):
print('進程pid: %s' %os.getpid())
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p=Pool(4)
while True:
conn,*_=server.accept()
p.apply_async(talk,args=(conn,))
# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問2、 client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來。
5、回調函數(shù)
需要回調函數(shù)的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數(shù)去處理該結果,該函數(shù)即回調函數(shù)
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(shù)(主進程負責執(zhí)行),這樣主進程在執(zhí)行回調函數(shù)時就省去了I/O的過程,直接拿到的是任務的結果。
1、 使用多進程請求多個url來減少網(wǎng)絡等待浪費的時間
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<進程%s> get %s' % (os.getpid(), url))
respone = requests.get(url)
if respone.status_code == 200:
return {'url': url, 'text': respone.text}
def pasrse_page(res):
print('<進程%s> parse %s' % (os.getpid(), res['url']))
parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))
with open('db.txt', 'a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p = Pool(3)
res_l = []
for url in urls:
res =p.apply_async(get_page, args=(url,), callback=pasrse_page)
res_l.append(res)
p.close()
p.join()
print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經(jīng)傳給回調函數(shù)處理了
'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''2、 爬蟲實例
import re
from urllib.request import urlopen
from multiprocessing import Pool
def get_page(url,pattern):
response=urlopen(url).read().decode('utf-8')
return pattern,response
def parse_page(info):
pattern,page_content=info
res=re.findall(pattern,page_content)
for item in res:
dic={
'index':item[0].strip(),
'title':item[1].strip(),
'actor':item[2].strip(),
'time':item[3].strip(),
}
print(dic)
if __name__ == '__main__':
regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
pattern1=re.compile(regex,re.S)
url_dic={
'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]
for url,pattern in url_dic.items():
res=p.apply_async(get_page,args=(url,pattern),
callback=parse_page) res_l.append(res) for i in res_l: i.get()6、無需回調函數(shù)
如果在主進程中等待進程池中所有任務都執(zhí)行完畢后,再統(tǒng)一處理結果,則無需回調函數(shù)。
from multiprocessing import Pool
import time,random,os
def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool()
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #等待進程池中所有進程執(zhí)行完畢
nums=[]
for res in res_l:
nums.append(res.get() ) #拿到所有結果
print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統(tǒng)一進行處理進程池的其他實現(xiàn)方法:https://docs.python.org/dev/library/concurrent.futures.html
到此這篇關于Python進程操作模塊的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
python爬蟲入門教程--快速理解HTTP協(xié)議(一)
http協(xié)議是互聯(lián)網(wǎng)里面最重要,最基礎的協(xié)議之一,我們的爬蟲需要經(jīng)常和http協(xié)議打交道。下面這篇文章主要給大家介紹了關于python爬蟲入門之快速理解HTTP協(xié)議的相關資料,文中介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。2017-05-05
python 使用Yolact訓練自己的數(shù)據(jù)集
這篇文章主要介紹了python 使用Yolact訓練自己的數(shù)據(jù)集,幫助大家更好的理解和學習使用python,感興趣的朋友可以了解下2021-04-04
Pandas DataFrame操作數(shù)據(jù)增刪查改
我們在用 pandas 處理數(shù)據(jù)的時候,經(jīng)常會遇到用其中一列數(shù)據(jù)替換另一列數(shù)據(jù)的場景。這一類的需求估計很多人都遇到,當然還有其它更復雜的。解決這類需求的辦法有很多,這里我們來推薦幾個,這篇文章主要介紹了Pandas DataFrame操作數(shù)據(jù)的增刪查改2022-10-10
Python實戰(zhàn)之OpenCV實現(xiàn)貓臉檢測
今天給大家?guī)淼氖顷P于Python的相關知識,文章圍繞著OpenCV實現(xiàn)貓臉檢測展開,文中有非常詳細的介紹及代碼示例,需要的朋友可以參考下2021-06-06

