Python與Redis的連接教程
今天在寫zabbix storm job監(jiān)控腳本的時候用到了python的redis模塊,之前也有用過,但是沒有過多的了解,今天看了下相關的api和源碼,看到有ConnectionPool的實現(xiàn),這里簡單說下。
在ConnectionPool之前,如果需要連接redis,我都是用StrictRedis這個類,在源碼中可以看到這個類的具體解釋:
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool這個類之后,可以使用如下方法
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
這里Redis是StrictRedis的子類
簡單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個參數(shù),其對應的是一個ConnectionPool的對象:
class StrictRedis(object):
........
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs=None, ssl_ca_certs=None):
if not connection_pool:
..........
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
在StrictRedis的實例執(zhí)行具體的命令時會調(diào)用execute_command方法,這里可以看到具體實現(xiàn)是從連接池中獲取一個具體的連接,然后執(zhí)行命令,完成后釋放連接:
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options) #調(diào)用ConnectionPool.get_connection方法獲取一個連接
try:
connection.send_command(*args) #命令執(zhí)行,這里為Connection.send_command
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection) #調(diào)用ConnectionPool.release釋放連接
在來看看ConnectionPool類:
class ConnectionPool(object):
...........
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs): #類初始化時調(diào)用構(gòu)造函數(shù)
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0: #判斷輸入的max_connections是否合法
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class #設置對應的參數(shù)
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset() #初始化ConnectionPool 時的reset操作
def reset(self):
self.pid = os.getpid()
self._created_connections = 0 #已經(jīng)創(chuàng)建的連接的計數(shù)器
self._available_connections = [] #聲明一個空的數(shù)組,用來存放可用的連接
self._in_use_connections = set() #聲明一個空的集合,用來存放已經(jīng)在用的連接
self._check_lock = threading.Lock()
.......
def get_connection(self, command_name, *keys, **options): #在連接池中獲取連接的方法
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop() #獲取并刪除代表連接的元素,在第一次獲取connectiong時,因為_available_connections是一個空的數(shù)組,
會直接調(diào)用make_connection方法
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection) #向代表正在使用的連接的集合中添加元素
return connection
def make_connection(self): #在_available_connections數(shù)組為空時獲取連接調(diào)用的方法
"Create a new connection"
if self._created_connections >= self.max_connections: #判斷創(chuàng)建的連接是否已經(jīng)達到最大限制,max_connections可以通過參數(shù)初始化
raise ConnectionError("Too many connections")
self._created_connections += 1 #把代表已經(jīng)創(chuàng)建的連接的數(shù)值+1
return self.connection_class(**self.connection_kwargs) #返回有效的連接,默認為Connection(**self.connection_kwargs)
def release(self, connection): #釋放連接,鏈接并沒有斷開,只是存在鏈接池中
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection) #從集合中刪除元素
self._available_connections.append(connection) #并添加到_available_connections 的數(shù)組中
def disconnect(self): #斷開所有連接池中的鏈接
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()
execute_command最終調(diào)用的是Connection.send_command方法,關閉鏈接為 Connection.disconnect方法,而Connection類的實現(xiàn):
class Connection(object):
"Manages TCP communication to and from a Redis server"
def __del__(self): #對象刪除時的操作,調(diào)用disconnect釋放連接
try:
self.disconnect()
except Exception:
pass
核心的鏈接建立方法是通過socket模塊實現(xiàn):
def _connect(self):
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive: #構(gòu)造函數(shù)中默認 socket_keepalive=False,因此這里默認為短連接
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout) #構(gòu)造函數(shù)中默認socket_connect_timeout=None,即連接為blocking的模式
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout) #構(gòu)造函數(shù)中默認socket_timeout=None
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
.....
關閉鏈接的方法:
def disconnect(self):
"Disconnects from the Redis server"
self._parser.on_disconnect()
if self._sock is None:
return
try:
self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close
self._sock.close()
except socket.error:
pass
self._sock = None
可以小結(jié)如下
1)默認情況下每創(chuàng)建一個Redis實例都會構(gòu)造出一個ConnectionPool實例,每一次訪問redis都會從這個連接池得到一個連接,操作完成后會把該連接放回連接池(連接并沒有釋放),可以構(gòu)造一個統(tǒng)一的ConnectionPool,在創(chuàng)建Redis實例時,可以將該ConnectionPool傳入,那么后續(xù)的操作會從給定的ConnectionPool獲得連接,不會再重復創(chuàng)建ConnectionPool。
2)默認情況下沒有設置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會在ConnectionPool.disconnect中統(tǒng)一銷毀。
- Redis總結(jié)筆記(二):C#連接Redis簡單例子
- RedisDesktopManager無法遠程連接Redis的完美解決方法
- springboot2整合redis使用lettuce連接池的方法(解決lettuce連接池無效問題)
- redis客戶端連接錯誤 NOAUTH Authentication required
- Redis連接超時異常的處理方法
- 詳解springboot配置多個redis連接
- 詳解Redis開啟遠程登錄連接
- Springboot2.X集成redis集群(Lettuce)連接的方法
- redis連接報錯error:NOAUTH Authentication required
- redis連接被拒絕的解決方案
- redis-copy使用6379端口無法連接到Redis服務器的問題
相關文章
聊聊通過celery_one避免Celery定時任務重復執(zhí)行的問題
Celery Once 也是利用 Redis 加鎖來實現(xiàn), Celery Once 在 Task 類基礎上實現(xiàn)了 QueueOnce 類,該類提供了任務去重的功能,今天通過本文給大家介紹通過celery_one避免Celery定時任務重復執(zhí)行的問題,感興趣的朋友一起看看吧2021-10-10
利用python模擬實現(xiàn)POST請求提交圖片的方法
最近在利用python做接口測試,其中有個上傳圖片的接口,在網(wǎng)上各種搜索,各種嘗試。下面這篇文章主要給大家介紹了關于利用python模擬實現(xiàn)POST請求提交圖片的相關資料,需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07
Python隨機數(shù)種子(random seed)的設置小結(jié)
隨機數(shù)種子是控制偽隨機數(shù)生成器的初始值,通過設置相同的種子,可以確保隨機數(shù)序列的一致性,本文主要介紹了Python隨機數(shù)種子(random seed)的設置,感興趣的可以了解一下2025-03-03
Python 網(wǎng)絡編程之TCP客戶端/服務端功能示例【基于socket套接字】
這篇文章主要介紹了Python 網(wǎng)絡編程之TCP客戶端/服務端功能,結(jié)合實例形式分析了Python使用socket套接字實現(xiàn)TCP協(xié)議下的客戶端與服務器端數(shù)據(jù)傳輸操作技巧,需要的朋友可以參考下2019-10-10
django-rest-swagger的優(yōu)化使用方法
今天小編就為大家分享一篇django-rest-swagger的優(yōu)化使用方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08
Python爬取用戶觀影數(shù)據(jù)并分析用戶與電影之間的隱藏信息!
看電影前很多人都喜歡去 『豆瓣』 看影評,所以我爬取44130條 『豆瓣』 的用戶觀影數(shù)據(jù),分析用戶之間的關系,電影之間的聯(lián)系,以及用戶和電影之間的隱藏關系,需要的朋友可以參考下2021-06-06

