Python全棧之協(xié)程詳解
1. 線(xiàn)程隊(duì)列
# ### 線(xiàn)程隊(duì)列
from queue import Queue
"""
put 存放 超出隊(duì)列長(zhǎng)度阻塞
get 獲取 超出隊(duì)列長(zhǎng)度阻塞
put_nowait 存放,超出隊(duì)列長(zhǎng)度報(bào)錯(cuò)
get_nowait 獲取,超出隊(duì)列長(zhǎng)度報(bào)錯(cuò)
"""
# (1) Queue
"""先進(jìn)先出,后進(jìn)先出"""
q = Queue()
q.put(100)
q.put(200)
print(q.get())
# print(q.get())
# print(q.get()) 阻塞
# print(q.get_nowait())
# print(q.get_nowait()) 報(bào)錯(cuò)
# Queue(3) => 指定隊(duì)列長(zhǎng)度, 元素個(gè)數(shù)只能是3個(gè);
q2 = Queue(3)
q2.put(1000)
q2.put(2000)
# q2.put(3000)
# q2.put(4000) 阻塞
q2.put_nowait(6000)
# q2.put_nowait(4000) 報(bào)錯(cuò)
# (2) LifoQueue
"""先進(jìn)后出,后進(jìn)先出(棧的特點(diǎn))"""
from queue import LifoQueue
lq = LifoQueue()
lq.put(110)
lq.put(120)
lq.put(119)
print(lq.get())
print(lq.get())
print(lq.get())
# (3) PriorityQueue
"""按照優(yōu)先級(jí)順序進(jìn)行排序存放(默認(rèn)從小到大)"""
"""在一個(gè)優(yōu)先級(jí)隊(duì)列中,要放同一類(lèi)型的數(shù)據(jù),不能混合使用"""
from queue import PriorityQueue
pq = PriorityQueue()
# 1.對(duì)數(shù)字進(jìn)行排序
pq.put(100)
pq.put(19)
pq.put(-90)
pq.put(88)
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
# 2.對(duì)字母進(jìn)行排序 (按照ascii編碼)
pq.put("wangwen")
pq.put("sunjian")
pq.put('wangwei')
pq.put("王文")
pq.put("孫堅(jiān)")
pq.put('王維')
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
print( pq.get() )
# 3.對(duì)容器進(jìn)行排序
pq.put( (22,"wangwen") )
pq.put( (67,"wangyuhan") )
pq.put( (3,"anxiaodong") )
pq.put( (3,"liuyubo") )
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
# 4.注意點(diǎn)
pq.put(100)
pq.put("nihao")
pq.put( (1,2,3) )
2. 進(jìn)程池_線(xiàn)程池
知識(shí)點(diǎn):
# 線(xiàn)程池
# 實(shí)例化線(xiàn)程池 ThreadPoolExcutor (推薦5*cpu_count)
# 異步提交任務(wù) submit / map
# 阻塞直到任務(wù)完成 shutdown
# 獲取子線(xiàn)程的返回值 result
# 使用回調(diào)函數(shù) add_done_callback
# 線(xiàn)程池 是由子線(xiàn)程實(shí)現(xiàn)的
# 進(jìn)程池 是由主進(jìn)程實(shí)現(xiàn)的
程序?qū)崿F(xiàn):
# ### 進(jìn)程池 和 線(xiàn)程池
from concurrent.futures import ProcessPoolExecutor , ThreadPoolExecutor
import os,time,random
# 獲取的邏輯處理器
# print(os.cpu_count())
"""多條進(jìn)程提前開(kāi)辟,可觸發(fā)多cpu的并行效果"""
'''
# (1) 進(jìn)程池 ProcessPoolExecutor
def func(i):
# print(i)
time.sleep(random.uniform(0.1,0.8))
print(" 任務(wù)執(zhí)行中 ... start ... 進(jìn)程號(hào){}".format(os.getpid()) , i )
print(" 任務(wù)執(zhí)行中 ... end ... 進(jìn)程號(hào){}".format(os.getpid()))
return i
if __name__ == "__main__":
lst = []
# (1) 創(chuàng)建進(jìn)程池對(duì)象
"""默認(rèn)參數(shù)是 系統(tǒng)最大的邏輯核心數(shù) 4"""
p = ProcessPoolExecutor()
# (2) 異步提交任務(wù)
"""submit(任務(wù),參數(shù)1,參數(shù)2 ... )"""
"""默認(rèn)如果一個(gè)進(jìn)程短時(shí)間內(nèi)可以完成更多的任務(wù),進(jìn)程池就不會(huì)使用更多的進(jìn)程來(lái)輔助完成 , 可以節(jié)省系統(tǒng)資源的損耗;"""
for i in range(10):
obj = p.submit( func , i )
# print(obj)
# print(obj.result()) 不要寫(xiě)在這,導(dǎo)致程序同步,內(nèi)部有阻塞
lst.append(obj)
# (3) 獲取當(dāng)前任務(wù)的返回值
for i in lst:
print(i.result(),">===獲取返回值===?")
# (4) shutdown 等待所有進(jìn)程池里的進(jìn)程執(zhí)行完畢之后,在放行
p.shutdown()
print("進(jìn)程池結(jié)束 ... ")
'''
# (2) ThreadPoolExecutor
'''
# from threading import currentThread as ct
from threading import current_thread as ct
def func(i):
print(" 任務(wù)執(zhí)行中 ... start ... 線(xiàn)程號(hào){}".format( ct().ident ) , i )
time.sleep(1)
print(" 任務(wù)執(zhí)行中 ... end ... 線(xiàn)程號(hào){}".format(os.getpid()))
return ct().ident # 線(xiàn)程號(hào)
if __name__ == "__main__":
lst = []
setvar = set()
"""默認(rèn)參數(shù)是 系統(tǒng)最大的邏輯核心數(shù) 4 * 5 = 20"""
# (1) 創(chuàng)建線(xiàn)程池對(duì)象
t = ThreadPoolExecutor() # 20
# print(t)
# (2) 異步提交任務(wù)
"""默認(rèn)如果一個(gè)線(xiàn)程短時(shí)間內(nèi)可以完成更多的任務(wù),線(xiàn)程池就不會(huì)使用更多的線(xiàn)程來(lái)輔助完成 , 可以節(jié)省系統(tǒng)資源的損耗;"""
for i in range(100):
obj = t.submit(func,i)
lst.append(obj)
# (3) 獲取當(dāng)前任務(wù)的返回值
for i in lst:
setvar.add(i.result())
# (4) shutdown 等待所有線(xiàn)程池里的線(xiàn)程執(zhí)行完畢之后,在放行
t.shutdown()
print("主線(xiàn)程執(zhí)行結(jié)束 ... ")
print(setvar , len(setvar))
'''
# (3) 線(xiàn)程池 map
from threading import currentThread as ct
from collections import Iterator,Iterable
def func(i):
time.sleep(random.uniform(0.1,0.7))
print("thread ... 線(xiàn)程號(hào){}".format(ct().ident),i)
return "*" * i
if __name__ == "__main__":
t = ThreadPoolExecutor()
it = t.map(func,range(100))
# 返回的數(shù)據(jù)是迭代器
print(isinstance(it,Iterator))
# 協(xié)調(diào)子父線(xiàn)程,等待線(xiàn)程池中所有線(xiàn)程執(zhí)行完畢之后,在放行;
t.shutdown()
# 獲取迭代器里面的返回值
for i in it:
print(i)
"""
# 總結(jié): 無(wú)論是進(jìn)程池還是線(xiàn)程池,都是由固定的進(jìn)程數(shù)或者線(xiàn)程數(shù)來(lái)執(zhí)行所有任務(wù)
系統(tǒng)不會(huì)額外創(chuàng)建多余的進(jìn)程或者線(xiàn)程來(lái)執(zhí)行任務(wù);
"""
3. 回調(diào)函數(shù)
知識(shí)點(diǎn):
# 回調(diào)函數(shù)
就是一個(gè)參數(shù),將這個(gè)函數(shù)作為參數(shù)傳到另一個(gè)函數(shù)里面.
函數(shù)先執(zhí)行,再執(zhí)行當(dāng)參數(shù)傳遞的這個(gè)函數(shù),這個(gè)參數(shù)函數(shù)是回調(diào)函數(shù)
程序?qū)崿F(xiàn):
# ### 回調(diào)函數(shù)
"""
回調(diào)函數(shù): 回頭調(diào)用一下函數(shù)獲取最后結(jié)果
微信支付寶付款成功后, 獲取付款金額
微信支付寶退款成功后, 獲取退款金額
一般用在獲取最后的狀態(tài)值時(shí),使用回調(diào)
通過(guò)add_done_callback最后調(diào)用一下自定義的回調(diào)函數(shù);
"""
from concurrent.futures import ProcessPoolExecutor , ThreadPoolExecutor
from threading import currentThread as ct
import os,time,random
"""進(jìn)程任務(wù)"""
def func1(i):
time.sleep(random.uniform(0.1,0.9))
print(" 進(jìn)程任務(wù)執(zhí)行中 ... start ... 進(jìn)程號(hào){}".format(os.getpid()) , i )
print(" 進(jìn)程任務(wù)執(zhí)行中 ... end ... 進(jìn)程號(hào){}".format(os.getpid()) )
return i
def call_back1(obj):
print( "<==回調(diào)函數(shù)的進(jìn)程號(hào){}==>".format(os.getpid()) )
print(obj.result())
"""線(xiàn)程任務(wù)"""
def func2(i):
time.sleep(random.uniform(0.1,0.9))
print(" 線(xiàn)程任務(wù)執(zhí)行中 ... start ... 線(xiàn)程號(hào){}".format(ct().ident) , i )
print(" 線(xiàn)程任務(wù)執(zhí)行中 ... end ... 線(xiàn)程號(hào){}".format( ct().ident) )
return i
def call_back2(obj):
print( "<==回調(diào)函數(shù)的線(xiàn)程號(hào){}==>".format( ct().ident) )
print(obj.result())
if __name__ == "__main__":
"""
# (1)進(jìn)程池 結(jié)果:(進(jìn)程池的回調(diào)函數(shù)由主進(jìn)程執(zhí)行)
p = ProcessPoolExecutor() # os.cpu_count() => 4
for i in range(1,11):
obj = p.submit(func1 , i )
# 使用add_done_callback在獲取最后返回值的時(shí)候,可以異步并行
obj.add_done_callback(call_back1)
# 直接使用result獲取返回值的時(shí)候,會(huì)變成同步程序,速度慢;
# obj.result()
p.shutdown()
print( "主進(jìn)程執(zhí)行結(jié)束...進(jìn)程號(hào):" , os.getpid() )
"""
print("<==============================================>")
# (2)線(xiàn)程池 結(jié)果:(線(xiàn)程池的回調(diào)函數(shù)由子線(xiàn)程執(zhí)行)
t = ThreadPoolExecutor()
for i in range(1,11):
obj = t.submit(func2 , i )
# 使用add_done_callback在獲取最后返回值的時(shí)候,可以異步并發(fā)
obj.add_done_callback(call_back2)
# 直接使用result獲取返回值的時(shí)候,會(huì)變成同步程序,速度慢;
# obj.result()
t.shutdown()
print("主線(xiàn)程執(zhí)行結(jié)束 .... 線(xiàn)程號(hào){}".format(ct().ident))
"""
# 原型:
class Ceshi():
def add_done_callback(self,func):
print("系統(tǒng)執(zhí)行操作1 ... ")
print("系統(tǒng)執(zhí)行操作2 ... ")
# 回頭調(diào)用一下
func(self)
def result(self):
return 112233
def call_back(obj):
print(obj.result())
obj = Ceshi()
obj.add_done_callback(call_back)
"""
4. 協(xié)程
知識(shí)點(diǎn):
#協(xié)程也叫纖程: 協(xié)程是線(xiàn)程的一種實(shí)現(xiàn)方式.
指的是一條線(xiàn)程能夠在多任務(wù)之間來(lái)回切換的一種實(shí)現(xiàn).
對(duì)于CPU、操作系統(tǒng)來(lái)說(shuō),協(xié)程并不存在.
任務(wù)之間的切換會(huì)花費(fèi)時(shí)間.
目前電腦配置一般線(xiàn)程開(kāi)到200會(huì)阻塞卡頓.
#協(xié)程的實(shí)現(xiàn)
協(xié)程幫助你記住哪個(gè)任務(wù)執(zhí)行到哪個(gè)位置上了,并且實(shí)現(xiàn)安全的切換
一個(gè)任務(wù)一旦阻塞卡頓,立刻切換到另一個(gè)任務(wù)繼續(xù)執(zhí)行,保證線(xiàn)程總是忙碌的,更加充分的利用CPU,搶占更多的時(shí)間片
# 一個(gè)線(xiàn)程可以由多個(gè)協(xié)程來(lái)實(shí)現(xiàn),協(xié)程之間不會(huì)產(chǎn)生數(shù)據(jù)安全問(wèn)題
#協(xié)程模塊
# greenlet gevent的底層,協(xié)程,切換的模塊
# gevent 直接用的,gevent能提供更全面的功能
程序?qū)崿F(xiàn):
# ### 協(xié)程
"""
進(jìn)程是資源分配的最小單位
線(xiàn)程是程序調(diào)度的最下單位
協(xié)程是線(xiàn)程實(shí)現(xiàn)的具體方式
總結(jié):
在進(jìn)程一定的情況下,開(kāi)辟多個(gè)線(xiàn)程,
在線(xiàn)程一定的情況下,創(chuàng)建多個(gè)協(xié)程,
以便提高更大的并行并發(fā)
"""
# (1) 用協(xié)程改寫(xiě)生產(chǎn)者消費(fèi)者模型
"""
def producer():
for i in range(1000):
yield i
def consumer(gen):
for i in range(10):
print( next(gen) )
gen = producer()
consumer(gen)
print("<==========>")
consumer(gen)
print("<==========>")
consumer(gen)
"""
# (2) greenlet 協(xié)程的早期版本
from greenlet import greenlet
import time
""" switch 可以切換任務(wù),但是需要手動(dòng)切換"""
"""
def eat():
print("eat1")
g2.switch()
time.sleep(3)
print("eat2")
def play():
print("play1")
time.sleep(3)
print("play2")
g1.switch()
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()
"""
# (3) 升級(jí)到gevent版本
"""自動(dòng)進(jìn)行任務(wù)上的切換,但是不能識(shí)別阻塞"""
"""
import gevent
def eat():
print("eat1")
gevent.sleep(3)
# time.sleep(3)
print("eat2")
def play():
print("play1")
gevent.sleep(3)
# time.sleep(3)
print("play2")
# 利用gevent.spawn創(chuàng)建協(xié)程對(duì)象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn創(chuàng)建協(xié)程對(duì)象g2
g2 = gevent.spawn(play)
# 如果不加join, 主線(xiàn)程直接結(jié)束任務(wù),不會(huì)默認(rèn)等待協(xié)程任務(wù).
# 阻塞,必須等待g1任務(wù)完成之后在放行
g1.join()
# 阻塞,必須等待g2任務(wù)完成之后在放行
g2.join()
print("主線(xiàn)程執(zhí)行結(jié)束 .... ")
"""
# (4) 協(xié)程的終極版本;
from gevent import monkey;monkey.patch_all()
"""引入猴子補(bǔ)丁,可以實(shí)現(xiàn)所有的阻塞全部識(shí)別"""
import time
import gevent
def eat():
print("eat1")
time.sleep(3)
print("eat2")
def play():
print("play1")
time.sleep(3)
print("play2")
# 利用gevent.spawn創(chuàng)建協(xié)程對(duì)象g1
g1 = gevent.spawn(eat)
# 利用gevent.spawn創(chuàng)建協(xié)程對(duì)象g2
g2 = gevent.spawn(play)
# 如果不加join, 主線(xiàn)程直接結(jié)束任務(wù),不會(huì)默認(rèn)等待協(xié)程任務(wù).
# 阻塞,必須等待g1任務(wù)完成之后在放行
g1.join()
# 阻塞,必須等待g2任務(wù)完成之后在放行
g2.join()
print(" 主線(xiàn)程執(zhí)行結(jié)束 ... ")
"""
# 分號(hào),利用分號(hào)可以把多行代碼放在一行進(jìn)行編寫(xiě);
a = 1
b = 2
a = 1;b = 2
"""
==理解:==一個(gè)線(xiàn)程上有好多任務(wù),協(xié)程可以記住每個(gè)任務(wù)完成的狀態(tài),比如做飯的時(shí)候做到一半的時(shí)候停下來(lái),去掃地,掃完地之后拐回來(lái)做飯,從做到一半的時(shí)候開(kāi)始做。

小提示: 下載gevent包,會(huì)自帶greenlet

早期版本的想到在time.sleep執(zhí)行了兩次,每次執(zhí)行了一秒鐘,切換回來(lái)有執(zhí)行了一秒,這是模擬早期版本,模擬堵塞

總結(jié):
p.shutdown() 這里的shutdown類(lèi)似于join 生成器在實(shí)例化對(duì)象的時(shí)候,里面的代碼是不走的,調(diào)用的時(shí)候才有,next 調(diào)用等 單線(xiàn)程實(shí)現(xiàn)的一種異步并發(fā)的一種結(jié)構(gòu) 協(xié)程能記住任務(wù)的狀態(tài)
本篇文章就到這里了,希望能夠給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
使用Django和Python創(chuàng)建Json response的方法
下面小編就為大家分享一篇使用Django和Python創(chuàng)建Json response的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-03-03
Python 用Redis簡(jiǎn)單實(shí)現(xiàn)分布式爬蟲(chóng)的方法
本篇文章主要介紹了Python 用Redis簡(jiǎn)單實(shí)現(xiàn)分布式爬蟲(chóng)的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-11-11
使用Anaconda3建立虛擬獨(dú)立的python2.7環(huán)境方法
今天小編就為大家分享一篇使用Anaconda3建立虛擬獨(dú)立的python2.7環(huán)境方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06
Python 強(qiáng)大的信號(hào)庫(kù) blinker 入門(mén)詳細(xì)教程
這篇文章主要介紹了Python 強(qiáng)大的信號(hào)庫(kù) blinker 入門(mén)教程,信號(hào)的特點(diǎn)就是發(fā)送端通知訂閱者發(fā)生了什么,使用信號(hào)分為 3 步:定義信號(hào),監(jiān)聽(tīng)信號(hào),發(fā)送信號(hào),需要的朋友可以參考下2022-02-02
Python實(shí)現(xiàn)檢測(cè)服務(wù)器是否可以ping通的2種方法
這篇文章主要介紹了Python實(shí)現(xiàn)檢測(cè)服務(wù)器是否可以ping通的2種方法,本文分別講解了使用ping和fping命令檢測(cè)服務(wù)器是否可以ping通,需要的朋友可以參考下2015-01-01
Python使用pyodbc訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)操作方法詳解
這篇文章主要介紹了Python使用pyodbc訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)操作方法,結(jié)合實(shí)例形式詳細(xì)分析了Python基于pyodbc針對(duì)數(shù)據(jù)庫(kù)的連接、查詢(xún)、插入、修改、刪除等操作技巧與注意事項(xiàng),需要的朋友可以參考下2018-07-07
使用Python批量處理Excel文件并轉(zhuǎn)為csv文件示例
這篇文章主要介紹了使用Python批量處理Excel文件并轉(zhuǎn)為csv文件示例,文中通過(guò)代碼示例給大家介紹非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-12-12
python logging重復(fù)記錄日志問(wèn)題的解決方法
python的logging模塊是python使用過(guò)程中打印日志的利器,下面這篇文章主要給大家介紹了關(guān)于python logging重復(fù)記錄日志問(wèn)題的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2018-07-07

