Python中Socket select基本使用介紹
一. 前言
在Python中,select 是一個用于異步I/O多路復(fù)用的模塊。它提供了一種簡單的方法,用于監(jiān)視多個文件描述符(file descriptor),以確定其中哪些文件描述符已經(jīng)就緒可讀、可寫或者發(fā)生了異常。使用 select 模塊可以實現(xiàn)一些高效的網(wǎng)絡(luò)編程,比如在服務(wù)端同時監(jiān)聽多個客戶端連接,或者在客戶端同時連接多個服務(wù)端。
二. 普通直連和使用select連接
使用select和普通的直接連接方式都可以實現(xiàn)連接,但二者有以下區(qū)別:
1. 直接連接方式:
在發(fā)送數(shù)據(jù)前,需要先建立連接,然后通過send函數(shù)發(fā)送數(shù)據(jù)。這種方式只能同時處理一個連接,無法同時處理多個連接。
2. select連接方式:
使用select可以監(jiān)聽多個文件描述符,當其中的任意一個文件描述符發(fā)生讀寫事件時,select函數(shù)就會返回并通知對應(yīng)的程序進行處理。這種方式可以同時處理多個連接,提高了系統(tǒng)的并發(fā)性能。
二. Select介紹
select 模塊暴露了3個主要的函數(shù),分別是 select.select()、select.poll() 和 select.epoll(),它們可以實現(xiàn)不同的多路復(fù)用機制。
三個函數(shù)的簡要介紹:
- select.select(rlist, wlist, xlist, timeout): 用于監(jiān)視文件描述符的變化,可以監(jiān)視讀、寫、異常事件。當這些事件中的任何一個發(fā)生時,
select函數(shù)會返回。rlist、wlist和xlist分別是要監(jiān)視的可讀、可寫和異常的文件描述符列表,timeout是超時參數(shù),單位是秒。 - select.poll():使用場景跟
select.select()相同,但性能更好,適用于監(jiān)視較大量的文件描述符。 - select.epoll():
Linux系統(tǒng)下的I/O復(fù)用機制,也是一種性能很好的多路復(fù)用機制。它相比于select.poll()
的優(yōu)點主要在于它可以支持更多的連接數(shù)。
除了以上三種函數(shù),還有 select.kevent() 和 select.kqueue() 函數(shù),它們適用于FreeBSD系統(tǒng)。
在使用 select 模塊時,需要注意以下幾點:
- 最好使用非阻塞的
socket,以避免程序在等待socket數(shù)據(jù)時被阻塞,從而可以處理其它socket數(shù)據(jù)。 select函數(shù)可能會有一些性能問題,當需要同時監(jiān)聽大量的文件描述符時,可能會導(dǎo)致 CPU 占用過高,所以使用時需要注意調(diào)優(yōu)。
三. 代碼示例
下面給出一個簡單的示例,演示如何使用 select 模塊同時監(jiān)聽多個 socket:
服務(wù)端Socket
import socket
import select
# 設(shè)置需要監(jiān)聽的 socket 地址和端口
ADDRESS = ("localhost", 9000)
# 創(chuàng)建一個服務(wù)器 socket,并綁定到指定地址和端口
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(ADDRESS)
server_socket.listen(5)
# 將 server_socket 設(shè)置為非阻塞模式
server_socket.setblocking(False)
# 存儲已連接的 client_socket
connected_clients = []
# 開始監(jiān)聽
while True:
# 使用 select 函數(shù)監(jiān)聽所有連接的 client_socket
readable_sockets, _, _ = select.select([server_socket] + connected_clients,
[], [], 1)
# 處理所有可讀的 socket
for sock in readable_sockets:
# 如果是 server_socket 表示有新的連接
if sock is server_socket:
client_socket, client_address = server_socket.accept()
connected_clients.append(client_socket)
print(f"New client connected: {client_address}")
# 否則是已連接的 client_socket,需要處理收到的數(shù)據(jù)
else:
try:
data = sock.recv(1024)
if data:
print(f"Received data from {sock.getpeername()}: {data.decode()}")
else:
# 如果收到的數(shù)據(jù)為空,表示連接已經(jīng)斷開,需要關(guān)閉 socket 并從 connected_clients 中移除
sock.close()
print(f"Client {sock.getpeername()} disconnected")
connected_clients.remove(sock)
except Exception as e:
# 出現(xiàn)異常,也需要關(guān)閉 socket 并從 connected_clients 中移除
print(f"Error occurred while receiving data from {sock.getpeername()}: {e}")
sock.close()
connected_clients.remove(sock)
在這個示例中,我們使用 select 模塊同時監(jiān)聽 server_socket 和所有 connected_clients,當有新的 client_socket 連接時,會將其添加到 connected_clients 列表中;當存在可讀的 socket 時,會根據(jù)是 server_socket 還是 client_socket 處理它們的相關(guān)操作。
需要注意的是,在處理已斷開連接的 client_socket 時,需要將其關(guān)閉并從 connected_clients 中移除,否則會一直存在于 connected_clients 中,導(dǎo)致程序出現(xiàn)意料之外的錯誤。
使用線程啟動server
import logging
import socket
from threading import Thread
import select
class SimSocketServer(Thread):
def __init__(self, addr, logger=None):
Thread.__init__(self, name="SimSocketServer")
self.connected_clients = []
self.addr = addr
self.seqNo = 1
self.socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger = logger
self.jsonTemplate = {
"Command": "FORWARD_ELEV_INFO",
"DeviceId": "C0002T",
}
def run(self):
self.logger.info('-----------------------------------------------------------')
self.logger.info("HC SimBattery Server addr:{} started listen...".format(self.addr))
self.logger.info('-----------------------------------------------------------')
# 創(chuàng)建一個服務(wù)器 socket,并綁定到指定地址和端口
self.socket_server.bind(self.addr)
self.socket_server.listen(5)
# 將 socket_server 設(shè)置為非阻塞模式
self.socket_server.setblocking(False)
# 存儲已連接的 client_socket
self.connected_clients = []
while True:
# 使用 select 函數(shù)監(jiān)聽所有連接的 client_socket
readable_sockets, _, _ = select.select([self.socket_server] + self.connected_clients, [], [], 1)
# 處理所有可讀的 socket
for sock in readable_sockets:
# 如果是 socket_server 表示有新的連接
if sock is self.socket_server:
client_socket, client_address = self.socket_server.accept()
self.connected_clients.append(client_socket)
self.logger.info(f"New client connected: {client_address}")
# 否則是已連接的 client_socket,需要處理收到的數(shù)據(jù)
else:
try:
data = sock.recv(1024)
if not data:
# 如果收到的數(shù)據(jù)為空,表示連接已經(jīng)斷開,需要關(guān)閉 socket 并從 connected_clients 中移除
sock.close()
self.logger.info(f"Client {sock.getpeername()} disconnected")
self.connected_clients.remove(sock)
continue
print(f"Received data from {sock.getpeername()}: {data.decode()}")
# 發(fā)送消息到所有連接的客戶端(除了發(fā)送者)
# for client in self.connected_clients:
# if client != sock:
# client.send(data)
except Exception as e:
# 出現(xiàn)異常,也需要關(guān)閉 socket 并從 connected_clients 中移除
self.logger.warn(f"Error occurred while receiving data from {sock.getpeername()}: {e}")
sock.close()
self.connected_clients.remove(sock)
def send_msg2client(self, msg):
try:
# 發(fā)送消息到所有連接的客戶端
for client in self.connected_clients:
client.send(f'hello client -> {msg}'.encode())
except Exception as e:
print('send msg error:'.format(e))
if __name__ == '__main__':
'''simulation battery socket server thread'''
simSocketServer = SimSocketServer(("127.0.0.1", 8787), logging)
simSocketServer.start()
while True:
try:
pc_cmd = input('輸入命令:\n')
print("pc_cmd", pc_cmd)
simSocketServer.send_msg2client(pc_cmd)
except Exception as e:
print(f'cmd error {e}')
客戶端Socket
import socket
import select
import sys
# 創(chuàng)建5個socket對象,用于連接5個不同的服務(wù)端
s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s3 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s4 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s5 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 設(shè)置服務(wù)端地址和端口號
server1_address = ('localhost', 9000)
server2_address = ('localhost', 9001)
server3_address = ('localhost', 9002)
server4_address = ('localhost', 9003)
server5_address = ('localhost', 9004)
# 將socket對象添加到列表中
sockets = [s1, s2, s3, s4, s5]
# 連接服務(wù)端
for sock, address in zip(sockets, [server1_address, server2_address, server3_address, server4_address, server5_address]):
try:
sock.connect(address)
except Exception as e:
print(f"Exception: {e}")
sys.exit()
while True:
# 使用select函數(shù)監(jiān)控所有socket對象
ready_to_read, _, _ = select.select(sockets, [], [])
# 在任何一個可讀socket對象中讀取數(shù)據(jù)
for sock in ready_to_read:
try:
data = sock.recv(1024)
if data:
print(f"Received: {data}")
else:
socket.close()
sockets.remove(sock)
except Exception as e:
sockets.remove(sock)
print(f"Exception: {e}")
sock.close()
我們使用select函數(shù)監(jiān)視所有socket對象。如果有數(shù)據(jù)可讀,我們就會讀取數(shù)據(jù)并打印出來。如果在接收數(shù)據(jù)時發(fā)生異常,我們將從sockets列表中刪除該socket對象并關(guān)閉它。
四. 多個服務(wù)端和多個客戶端使用select代碼示例
服務(wù)端
使用多線程啟動多個服務(wù)端
下面是服務(wù)端代碼的示例,它會同時監(jiān)聽8001, 8002和8003端口。當客戶端連接成功后會向客戶端發(fā)送一條歡迎消息,當客戶端發(fā)送信息時會原樣返回給客戶端,當客戶端關(guān)閉連接時,服務(wù)器也會關(guān)閉相應(yīng)的socket。
import socket
import threading
# 設(shè)置服務(wù)端地址和端口號
SERVER_ADDRESS = "localhost"
SERVER_PORT = 8001
# 監(jiān)聽的隊列大小
LISTEN_QUEUE_SIZE = 5
# 消息歡迎消息
WELCOME_MESSAGE = "Welcome to server!"
# 為每個客戶端建立相應(yīng)的socket連接
def handle_client(client_socket, client_address):
print(f"New connection from {client_address}")
client_socket.send(WELCOME_MESSAGE.encode())
while True:
try:
data = client_socket.recv(1024)
if data:
print(f"Received message from {client_address}: {data.decode()}")
client_socket.send(data)
else:
# 關(guān)閉客戶端連接
client_socket.close()
print(f"Connection closed by {client_address}")
break
except Exception as e:
print(f"Error encountered while receiving data from {client_address}: {e}")
client_socket.close()
break
# 監(jiān)聽多個socket
def listen(address, connections):
# 創(chuàng)建socket連接
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(address)
server_socket.listen(LISTEN_QUEUE_SIZE)
print(f"Listening on {address[0]}:{address[1]}")
while True:
# 等待客戶端連接
client_socket, client_address = server_socket.accept()
print(f"Accepted new connection from {client_address}")
# 為客戶端啟動線程
thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
thread.daemon = True
thread.start()
# 將客戶端的socket連接保存
connections.append(client_socket)
# 啟動監(jiān)聽
connections = []
threads = []
for port in range(SERVER_PORT, SERVER_PORT + 3):
address = (SERVER_ADDRESS, port)
thread = threading.Thread(target=listen, args=(address, connections))
threads.append(thread)
thread.start()
# 等待所有線程結(jié)束
for thread in threads:
thread.join()
# 關(guān)閉所有連接
for connection in connections:
connection.close()
在這個示例代碼中,我們使用了Python中的多線程來同時監(jiān)聽8001, 8002和8003端口的客戶端連接,這個方式更加靈活和高效。handle_client()函數(shù)用于處理每個客戶端連接,它會向客戶端發(fā)送一條歡迎消息,并在客戶端發(fā)送數(shù)據(jù)時原樣返回給客戶端。listen()函數(shù)用于監(jiān)聽一個端口并為每個客戶端連接啟動一個新線程處理。最后在主線程中,我們啟動了三個線程分別監(jiān)聽不同的端口,等待所有線程結(jié)束并關(guān)閉所有連接。
客戶端
下面的客戶端程序可以連接多個服務(wù)端并同時監(jiān)聽每個服務(wù)端返回的消息。代碼中處理了連接、發(fā)送和接收數(shù)據(jù)時可能出現(xiàn)的異常情況。
import socket
import time
import traceback
import select
# 設(shè)置服務(wù)端地址列表
SERVER_ADDRESSES = [("localhost", 8001), ("localhost", 8002), ("localhost", 8003)]
def set_socket(server_address):
sockets = []
for server_addr in server_address:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.setblocking(True)
sockets.append(client_socket)
return sockets
def start_client(server_address):
sockets = set_socket(server_address)
# 連接服務(wù)端
for index, client_socket in enumerate(sockets):
server_address = SERVER_ADDRESSES[index]
try:
client_socket.connect(server_address)
except Exception as e:
print(f"Connect to {server_address} failed: {e}")
start_listen(sockets)
def start_listen(sockets):
# 使用select監(jiān)聽服務(wù)端
while True:
try:
# 僅監(jiān)聽已連接的socket,獲取到了數(shù)據(jù)則接收數(shù)據(jù)
readable, writeable, errors = select.select(sockets, [], sockets)
for socket in readable:
try:
data = socket.recv(1024)
if data:
print(f"Received message from {socket.getpeername()}: {data.decode()}")
else:
# 當對端關(guān)閉連接時,對應(yīng)的可讀socket也會被認為是可寫的
# 并且其recv方法將返回空字節(jié)流
sockets.remove(socket)
except Exception as e:
print(f"Error encountered while receiving data: {e}")
sockets.remove(socket)
for socket in errors:
print(f"Error encountered on {socket.getpeername()}")
sockets.remove(socket)
except Exception as e:
traceback.print_exc()
time.sleep(1)
start_client(SERVER_ADDRESSES)
break
if __name__ == '__main__':
# 創(chuàng)建socket連接
start_client(SERVER_ADDRESSES)
五. 總結(jié)
在實際應(yīng)用中,select的作用和好處在于它能夠使程序監(jiān)聽多個連接,解決了直接連接方式只能處理單個連接的問題。這種方式可以讓程序在處理一個連接時,同時監(jiān)聽其他連接,使得程序的并發(fā)性能得到大幅提升,適用于高并發(fā)環(huán)境下的網(wǎng)絡(luò)通信場景。
總之,select方式適用于多連接場景,可以提高程序的運行效率。而直接連接方式適用于簡單的單連接場景。
到此這篇關(guān)于Python中Socket select基本使用介紹的文章就介紹到這了,更多相關(guān)Python Socket select內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Python實現(xiàn)Excel報表自動化的操作指南
在職場中,數(shù)據(jù)分析的終極目標往往是生成一份清晰、專業(yè)的報表,數(shù)據(jù)分析完成,卻要花費大量時間手工制圖,制作出的報表枯燥無味,只有密密麻麻的數(shù)字,缺乏可視化圖表的直觀沖擊力,今天,我將帶你進入Python報表自動化的終極世界,如何利用Python實現(xiàn)Excel報表自動化2025-11-11
如何在conda虛擬環(huán)境中配置cuda+cudnn+pytorch深度學(xué)習(xí)環(huán)境
這篇文章主要介紹了如何在conda虛擬環(huán)境中配置cuda+cudnn+pytorch深度學(xué)習(xí)環(huán)境,想在服務(wù)器上配置深度學(xué)習(xí)的環(huán)境,看了很多資料后總結(jié)出來了對于新手比較友好的配置流程,需要的朋友可以參考下2023-03-03

