Python gevent協(xié)程切換實現(xiàn)詳解
一、背景
大家都知道gevent的機制是單線程+協(xié)程機制,當遇到可能會阻塞的操作時,就切換到可運行的協(xié)程中繼續(xù)運行,以此來實現(xiàn)提交系統(tǒng)運行效率的目標,但是具體是怎么實現(xiàn)的呢?讓我們直接從代碼中看一下吧。
二、切換機制
讓我們從socket的send、recv方法入手:
def recv(self, *args):
while 1:
try:
return self._sock.recv(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
# QQQ without clearing exc_info test__refcount.test_clean_exit fails
sys.exc_clear()
self._wait(self._read_event)
這里會開啟一個死循環(huán),在循環(huán)中調(diào)用self._sock.recv()方法,并捕獲異常,當錯誤是EWOULDBLOCK時,則調(diào)用self._wait(self._read_event)方法,該方法其實是:_wait = _wait_on_socket,_wait_on_socket方法的定義在文件:_hub_primitives.py中,如下:
# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc=None):
if socket is None or watcher is None:
# test__hub TestCloseSocketWhilePolling, on Python 2; Python 3
# catches the EBADF differently.
raise ConcurrentObjectUseError("The socket has already been closed by another greenlet")
_primitive_wait(watcher, socket.timeout,
timeout_exc if timeout_exc is not None else _NONE,
socket.hub)
該方法其實是調(diào)用了函數(shù):_primitive_wait(),其仍然在文件:_hub_primitives.py中定義,如下:
def _primitive_wait(watcher, timeout, timeout_exc, hub):
if watcher.callback is not None:
raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
% (watcher.callback, ))
if hub is None:
hub = get_hub()
if timeout is None:
hub.wait(watcher)
return
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error('timed out')))
with timeout:
hub.wait(watcher)
這里其實是調(diào)用了hub.wait()函數(shù),該函數(shù)的定義在文件_hub.py中,如下:
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
def wait(self, watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
"""
waiter = Waiter(self) # pylint:disable=undefined-variable
watcher.start(waiter.switch, waiter)
try:
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError(
'Invalid switch into %s: got %r (expected %r; waiting on %r with %r)' % (
getcurrent(), # pylint:disable=undefined-variable
result,
waiter,
self,
watcher
)
)
finally:
watcher.stop()
watcher.stop()
該類WaitOperationsGreenlet是Hub的基類,其方法wait中的邏輯是:生成一個Waiter對象,并調(diào)用watcher.start(waiter.switch, waiter)方法,watcher是最開始recv方法中使用的self._read_event,watcher是gevent的底層事件框架libev中的概念;同時還有一個waiter對象,它類似與python中的future概念,該對象有一個switch()方法以及get()方法,當沒有得到結(jié)果沒有準備好時,調(diào)用waiter.get()方法回導(dǎo)致協(xié)程被掛起;get()函數(shù)的定義如下:
def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
getcurrent().throw(*self._exception) # pylint:disable=undefined-variable
else:
if self.greenlet is not None:
raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
self.greenlet = getcurrent() # pylint:disable=undefined-variable
try:
return self.hub.switch()
finally:
self.greenlet = None
在get()中最關(guān)鍵的是self.hub.switch()函數(shù),該函數(shù)將執(zhí)行權(quán)轉(zhuǎn)移到hub,并繼續(xù)運行,至此已經(jīng)分析完了當在worker協(xié)程中從網(wǎng)絡(luò)獲取數(shù)據(jù)遇到阻塞時,如何避免阻塞并切換到hub中的實現(xiàn),至于何時再切換會worker協(xié)程,我們后續(xù)再繼續(xù)分析。
總結(jié)
要記得gevent中一個重要的概念,協(xié)程切換不是調(diào)用而是執(zhí)行權(quán)的轉(zhuǎn)移,從可能會阻塞的協(xié)程切換到hub,并由hub在合適的時機切換到另一個可以繼續(xù)運行的協(xié)程繼續(xù)執(zhí)行;gevent通過這種形式實現(xiàn)了提高io密集型應(yīng)用吞吐率的目標。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python實現(xiàn)指定文件夾下的指定文件移動到指定位置
這篇文章主要為大家詳細介紹了python實現(xiàn)指定文件夾下的指定文件移動到指定位置,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-09-09
Pandas中DataFrame.replace()函數(shù)的實現(xiàn)
DataFrame.replace()用于替換DataFrame中的指定值,本文主要介紹了Pandas中DataFrame.replace()函數(shù)的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-07-07
Django+simpleui實現(xiàn)文件上傳預(yù)覽功能(詳細過程)
該文章詳細介紹了如何在Django框架中實現(xiàn)文件上傳、預(yù)覽和下載功能,并使用SimpleUI美化Django后臺界面,通過創(chuàng)建模型、表單、視圖和配置URL,實現(xiàn)了文件的存儲和管理,同時,文章還提到了配置媒體文件、創(chuàng)建模板以及在生產(chǎn)環(huán)境中的部署注意事項,感興趣的朋友一起看看吧2025-02-02
Centos 升級到python3后pip 無法使用的解決方法
今天小編就為大家分享一篇Centos 升級到python3后pip 無法使用的解決方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06
Python腳本在Appium庫上對移動應(yīng)用實現(xiàn)自動化測試
這篇文章主要介紹了使用Python的Appium庫對移動應(yīng)用實現(xiàn)自動化測試的教程,屬于Python腳本的一個自動化應(yīng)用,需要的朋友可以參考下2015-04-04
詳解分布式系統(tǒng)中如何用python實現(xiàn)Paxos
提到分布式算法,就不得不提 Paxos 算法,在過去幾十年里,它基本上是分布式共識的代 名詞,因為當前最常用的一批共識算法都是基于它改進的。比如,F(xiàn)ast Paxos 算法、 Cheap Paxos 算法、Raft 算法、ZAB 協(xié)議等等。2021-05-05
Python如何統(tǒng)計函數(shù)調(diào)用的耗時
這篇文章主要為大家詳細介紹了如何使用Python實現(xiàn)統(tǒng)計函數(shù)調(diào)用的耗時,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-04-04

