PySerial 串口通信的多種實(shí)現(xiàn)方法
PySerial 是一個 Python 串口通信庫,可以用于與各種串口設(shè)備(如 Arduino、傳感器、嵌入式設(shè)備等)進(jìn)行通信。
1. PySerial 簡介
PySerial 提供了跨平臺的串口通信功能,支持:
- Windows、Linux、macOS
- 多種串口設(shè)置(波特率、數(shù)據(jù)位、停止位、校驗(yàn)位等)
- 同步和異步通信
- 二進(jìn)制和文本數(shù)據(jù)
2. 安裝和基礎(chǔ)使用
2.1 安裝
pip install pyserial
2.2 基礎(chǔ)示例
import serial
import time
# 基礎(chǔ)串口通信示例
try:
# 打開串口
ser = serial.Serial('COM3', 9600, timeout=1)
print(f"串口已打開: {ser.name}")
# 等待設(shè)備初始化
time.sleep(2)
# 發(fā)送數(shù)據(jù)
ser.write(b'Hello Arduino!\n')
print("數(shù)據(jù)已發(fā)送")
# 讀取數(shù)據(jù)
response = ser.readline().decode('utf-8').strip()
print(f"收到響應(yīng): {response}")
# 關(guān)閉串口
ser.close()
print("串口已關(guān)閉")
except serial.SerialException as e:
print(f"串口錯誤: {e}")
except Exception as e:
print(f"其他錯誤: {e}")
3. 串口配置和打開
3.1 查找可用串口
import serial
import serial.tools.list_ports
def list_serial_ports():
"""列出所有可用的串口"""
ports = serial.tools.list_ports.comports()
if not ports:
print("沒有找到可用的串口")
return []
print("可用串口列表:")
for i, port in enumerate(ports, 1):
print(f"{i}. {port.device} - {port.description}")
print(f" 硬件ID: {port.hwid}")
print(f" 制造商: {port.manufacturer}")
print(f" 產(chǎn)品: {port.product}")
print()
return [port.device for port in ports]
# 使用示例
available_ports = list_serial_ports()
3.2 串口配置參數(shù)
import serial
# 完整的串口配置
serial_config = {
'port': 'COM3', # 串口號 (Windows: COM3, Linux: /dev/ttyUSB0, macOS: /dev/tty.usbserial)
'baudrate': 9600, # 波特率 (常用: 9600, 115200, 57600, 38400)
'bytesize': serial.EIGHTBITS, # 數(shù)據(jù)位 (可選: FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
'parity': serial.PARITY_NONE, # 校驗(yàn)位 (可選: PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
'stopbits': serial.STOPBITS_ONE, # 停止位 (可選: STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
'timeout': 1, # 讀取超時時間(秒),None表示阻塞讀取,0表示非阻塞
'xonxoff': False, # 軟件流控制
'rtscts': False, # 硬件(RTS/CTS)流控制
'dsrdtr': False, # 硬件(DSR/DTR)流控制
'write_timeout': 2, # 寫入超時時間
}
# 打開串口
try:
ser = serial.Serial(**serial_config)
print(f"串口配置: {ser}")
# 獲取當(dāng)前配置
print(f"當(dāng)前波特率: {ser.baudrate}")
print(f"當(dāng)前數(shù)據(jù)位: {ser.bytesize}")
print(f"當(dāng)前校驗(yàn)位: {ser.parity}")
print(f"當(dāng)前停止位: {ser.stopbits}")
ser.close()
except serial.SerialException as e:
print(f"無法打開串口: {e}")
3.3 自動檢測和連接串口
import serial
import serial.tools.list_ports
import time
def auto_connect_serial(baudrate=9600, timeout=1):
"""
自動檢測并連接串口
返回: serial.Serial 對象 或 None
"""
ports = serial.tools.list_ports.comports()
if not ports:
print("未找到可用串口")
return None
for port in ports:
try:
print(f"嘗試連接: {port.device} - {port.description}")
# 嘗試連接
ser = serial.Serial(
port=port.device,
baudrate=baudrate,
timeout=timeout
)
# 測試連接
time.sleep(2) # 等待設(shè)備初始化
# 清空緩沖區(qū)
ser.reset_input_buffer()
ser.reset_output_buffer()
# 發(fā)送測試命令
ser.write(b'AT\r\n')
time.sleep(0.1)
# 檢查響應(yīng)
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f"設(shè)備響應(yīng): {response}")
print(f"成功連接到: {port.device}")
return ser
except serial.SerialException:
print(f"連接 {port.device} 失敗,嘗試下一個...")
continue
print("所有串口連接嘗試失敗")
return None
# 使用示例
ser = auto_connect_serial()
if ser:
# 進(jìn)行通信操作
ser.write(b'Hello!\n')
ser.close()
4. 數(shù)據(jù)讀寫操作
4.1 基本讀寫操作
import serial
import time
def basic_read_write_example():
"""基本讀寫操作示例"""
try:
# 打開串口
ser = serial.Serial('COM3', 9600, timeout=1)
print("串口已打開")
# 等待設(shè)備就緒
time.sleep(2)
# 清空緩沖區(qū)
ser.reset_input_buffer()
ser.reset_output_buffer()
# 方法1: 寫入字符串 (需要編碼為字節(jié))
command = "AT+VER?\r\n"
ser.write(command.encode('utf-8'))
print(f"發(fā)送命令: {command}")
# 方法2: 直接寫入字節(jié)
ser.write(b'AT+ID?\r\n')
# 讀取數(shù)據(jù)
time.sleep(0.5) # 等待響應(yīng)
# 方法1: 讀取指定字節(jié)數(shù)
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"原始數(shù)據(jù): {data}")
print(f"解碼后: {data.decode('utf-8', errors='ignore')}")
# 方法2: 讀取一行
ser.write(b'AT+NAME?\r\n')
line = ser.readline().decode('utf-8', errors='ignore').strip()
print(f"讀取一行: {line}")
# 方法3: 讀取所有可用數(shù)據(jù)
ser.write(b'AT+HELP\r\n')
time.sleep(1)
while ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"收到數(shù)據(jù): {data.decode('utf-8', errors='ignore')}")
ser.close()
except Exception as e:
print(f"錯誤: {e}")
# 運(yùn)行示例
basic_read_write_example()
4.2 文本模式通信
import serial
def text_communication_example():
"""文本模式通信示例"""
try:
ser = serial.Serial('COM3', 9600, timeout=1)
# 發(fā)送文本命令
commands = [
"LED ON\n",
"GET TEMP\n",
"SET SPEED 100\n",
"STATUS\n"
]
for cmd in commands:
print(f"發(fā)送: {cmd.strip()}")
ser.write(cmd.encode('utf-8'))
# 讀取響應(yīng)
response = ser.readline().decode('utf-8', errors='ignore').strip()
print(f"響應(yīng): {response}")
# 處理特定響應(yīng)
if "TEMP" in response:
temp_value = response.split(':')[1].strip()
print(f"溫度值: {temp_value}°C")
ser.close()
except Exception as e:
print(f"錯誤: {e}")
# 運(yùn)行示例
text_communication_example()
4.3 二進(jìn)制數(shù)據(jù)通信
import serial
import struct
import time
def binary_communication_example():
"""二進(jìn)制數(shù)據(jù)通信示例"""
try:
ser = serial.Serial('COM3', 115200, timeout=1)
# 示例1: 發(fā)送二進(jìn)制命令
# 假設(shè)協(xié)議: 起始字節(jié)(0xAA) + 命令字節(jié) + 數(shù)據(jù)長度 + 數(shù)據(jù) + 校驗(yàn)和
# 構(gòu)建數(shù)據(jù)包
start_byte = 0xAA
command = 0x01 # 讀取數(shù)據(jù)命令
data = [0x10, 0x20, 0x30]
data_length = len(data)
# 計(jì)算校驗(yàn)和
checksum = (start_byte + command + data_length + sum(data)) & 0xFF
# 打包數(shù)據(jù)
packet = struct.pack('BBB', start_byte, command, data_length)
packet += bytes(data)
packet += struct.pack('B', checksum)
print(f"發(fā)送數(shù)據(jù)包: {packet.hex()}")
ser.write(packet)
# 等待響應(yīng)
time.sleep(0.1)
# 讀取響應(yīng)
if ser.in_waiting >= 10: # 假設(shè)響應(yīng)包長度至少10字節(jié)
response = ser.read(ser.in_waiting)
print(f"收到響應(yīng): {response.hex()}")
# 解析響應(yīng)
if len(response) >= 3:
# 解析前3個字節(jié)
start, cmd, length = struct.unpack('BBB', response[:3])
print(f"響應(yīng): 起始={hex(start)}, 命令={hex(cmd)}, 長度={length}")
# 示例2: 發(fā)送浮點(diǎn)數(shù)
float_value = 3.14159
float_bytes = struct.pack('f', float_value)
ser.write(b'FLOAT')
ser.write(float_bytes)
print(f"發(fā)送浮點(diǎn)數(shù): {float_value} -> {float_bytes.hex()}")
ser.close()
except Exception as e:
print(f"錯誤: {e}")
# 運(yùn)行示例
binary_communication_example()
4.4 持續(xù)數(shù)據(jù)讀取
import serial
import threading
import time
class SerialMonitor:
"""串口監(jiān)視器,持續(xù)讀取數(shù)據(jù)"""
def __init__(self, port, baudrate=9600):
self.ser = serial.Serial(port, baudrate, timeout=1)
self.running = False
self.thread = None
def start_monitoring(self):
"""開始監(jiān)視串口數(shù)據(jù)"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.start()
print("串口監(jiān)視器已啟動")
def _monitor_loop(self):
"""監(jiān)視循環(huán)"""
while self.running:
try:
# 檢查是否有數(shù)據(jù)可讀
if self.ser.in_waiting > 0:
# 讀取一行數(shù)據(jù)
line = self.ser.readline()
try:
decoded = line.decode('utf-8').strip()
print(f"[串口數(shù)據(jù)] {decoded}")
# 這里可以添加數(shù)據(jù)處理邏輯
self._process_data(decoded)
except UnicodeDecodeError:
# 如果是二進(jìn)制數(shù)據(jù),以十六進(jìn)制顯示
print(f"[二進(jìn)制數(shù)據(jù)] {line.hex()}")
# 短暫休眠,避免占用過多CPU
time.sleep(0.01)
except Exception as e:
print(f"讀取數(shù)據(jù)時出錯: {e}")
time.sleep(1)
def _process_data(self, data):
"""處理接收到的數(shù)據(jù)"""
# 示例:處理特定格式的數(shù)據(jù)
if data.startswith("TEMP:"):
temp = data.split(":")[1]
print(f"檢測到溫度數(shù)據(jù): {temp}°C")
elif data.startswith("HUMI:"):
humi = data.split(":")[1]
print(f"檢測到濕度數(shù)據(jù): {humi}%")
def send_command(self, command):
"""發(fā)送命令"""
try:
if isinstance(command, str):
command = command.encode('utf-8')
self.ser.write(command)
print(f"發(fā)送命令: {command}")
except Exception as e:
print(f"發(fā)送命令失敗: {e}")
def stop_monitoring(self):
"""停止監(jiān)視"""
self.running = False
if self.thread:
self.thread.join()
self.ser.close()
print("串口監(jiān)視器已停止")
# 使用示例
def monitor_example():
monitor = SerialMonitor('COM3', 9600)
monitor.start_monitoring()
try:
# 主線程可以繼續(xù)執(zhí)行其他操作
for i in range(5):
monitor.send_command(f"GET DATA {i}\n")
time.sleep(2)
# 等待用戶輸入退出
input("按回車鍵停止監(jiān)視...")
finally:
monitor.stop_monitoring()
# 運(yùn)行示例
monitor_example()
5. 高級功能
5.1 串口掃描和自動重連
import serial
import serial.tools.list_ports
import time
import threading
class SmartSerialConnection:
"""智能串口連接管理器"""
def __init__(self, port=None, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.connected = False
self.reconnect_thread = None
self.reconnect_flag = False
def connect(self, port=None):
"""連接到串口"""
if port:
self.port = port
if not self.port:
print("未指定串口號,嘗試自動檢測...")
self.port = self._auto_detect_port()
if not self.port:
print("無法找到可用串口")
return False
try:
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1,
write_timeout=1
)
# 測試連接
time.sleep(2) # 等待設(shè)備初始化
self._test_connection()
self.connected = True
print(f"成功連接到 {self.port}")
return True
except serial.SerialException as e:
print(f"連接失敗: {e}")
self.connected = False
return False
def _auto_detect_port(self):
"""自動檢測串口"""
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"嘗試端口: {port.device}")
try:
# 嘗試打開端口
test_ser = serial.Serial(port.device, self.baudrate, timeout=0.5)
time.sleep(1)
# 發(fā)送測試命令
test_ser.write(b'AT\r\n')
time.sleep(0.1)
# 檢查響應(yīng)
if test_ser.in_waiting > 0:
response = test_ser.read(test_ser.in_waiting)
print(f"端口 {port.device} 響應(yīng): {response}")
test_ser.close()
return port.device
test_ser.close()
except:
continue
return None
def _test_connection(self):
"""測試連接是否正常"""
if not self.ser:
return False
try:
# 清空緩沖區(qū)
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# 發(fā)送測試命令
self.ser.write(b'AT\r\n')
time.sleep(0.1)
# 嘗試讀取響應(yīng)
if self.ser.in_waiting > 0:
return True
else:
return False
except:
return False
def start_auto_reconnect(self, interval=5):
"""啟動自動重連"""
if self.reconnect_thread and self.reconnect_thread.is_alive():
print("自動重連已在運(yùn)行")
return
self.reconnect_flag = True
self.reconnect_thread = threading.Thread(
target=self._reconnect_loop,
args=(interval,),
daemon=True
)
self.reconnect_thread.start()
print("自動重連已啟動")
def _reconnect_loop(self, interval):
"""重連循環(huán)"""
while self.reconnect_flag:
if not self.connected:
print("嘗試重新連接...")
self.connect()
# 定期檢查連接狀態(tài)
if self.connected:
if not self._test_connection():
print("連接斷開")
self.connected = False
if self.ser:
self.ser.close()
time.sleep(interval)
def stop_auto_reconnect(self):
"""停止自動重連"""
self.reconnect_flag = False
if self.reconnect_thread:
self.reconnect_thread.join(timeout=2)
print("自動重連已停止")
def send(self, data):
"""發(fā)送數(shù)據(jù)"""
if not self.connected or not self.ser:
print("未連接,無法發(fā)送數(shù)據(jù)")
return False
try:
if isinstance(data, str):
data = data.encode('utf-8')
self.ser.write(data)
return True
except Exception as e:
print(f"發(fā)送失敗: {e}")
self.connected = False
return False
def receive(self, timeout=1):
"""接收數(shù)據(jù)"""
if not self.connected or not self.ser:
print("未連接,無法接收數(shù)據(jù)")
return None
try:
# 設(shè)置臨時超時
original_timeout = self.ser.timeout
self.ser.timeout = timeout
data = self.ser.readline()
# 恢復(fù)原始超時
self.ser.timeout = original_timeout
if data:
try:
return data.decode('utf-8').strip()
except:
return data.hex()
else:
return None
except Exception as e:
print(f"接收失敗: {e}")
return None
def close(self):
"""關(guān)閉連接"""
self.stop_auto_reconnect()
self.connected = False
if self.ser:
self.ser.close()
print("連接已關(guān)閉")
# 使用示例
def smart_connection_example():
serial_conn = SmartSerialConnection(baudrate=9600)
# 嘗試連接
if serial_conn.connect('COM3'):
print("連接成功")
# 啟動自動重連
serial_conn.start_auto_reconnect(interval=10)
# 發(fā)送和接收數(shù)據(jù)
for i in range(10):
serial_conn.send(f"TEST {i}\n")
response = serial_conn.receive()
if response:
print(f"收到: {response}")
time.sleep(1)
# 停止并關(guān)閉
serial_conn.close()
else:
print("連接失敗")
# 運(yùn)行示例
smart_connection_example()
5.2 數(shù)據(jù)協(xié)議解析器
import serial
import struct
import crcmod
import time
class SerialProtocol:
"""串口協(xié)議處理器"""
def __init__(self, ser):
self.ser = ser
self.buffer = bytearray()
# 初始化CRC計(jì)算
self.crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
def send_packet(self, command, data=None):
"""發(fā)送數(shù)據(jù)包"""
if data is None:
data = bytearray()
# 構(gòu)建數(shù)據(jù)包
packet = bytearray()
# 起始字節(jié)
packet.append(0xAA)
packet.append(0x55)
# 命令字節(jié)
packet.append(command)
# 數(shù)據(jù)長度
packet.append(len(data))
# 數(shù)據(jù)
packet.extend(data)
# 計(jì)算CRC
crc = self.crc16(bytes(packet))
packet.extend(struct.pack('<H', crc))
# 發(fā)送數(shù)據(jù)包
self.ser.write(packet)
print(f"發(fā)送數(shù)據(jù)包: {packet.hex()}")
return packet
def receive_packet(self, timeout=1):
"""接收數(shù)據(jù)包"""
start_time = time.time()
while time.time() - start_time < timeout:
# 讀取可用數(shù)據(jù)
if self.ser.in_waiting > 0:
self.buffer.extend(self.ser.read(self.ser.in_waiting))
# 嘗試解析數(shù)據(jù)包
packet = self._parse_buffer()
if packet:
return packet
return None
def _parse_buffer(self):
"""解析緩沖區(qū)中的數(shù)據(jù)包"""
# 查找起始字節(jié)
start_index = -1
for i in range(len(self.buffer) - 1):
if self.buffer[i] == 0xAA and self.buffer[i+1] == 0x55:
start_index = i
break
if start_index == -1:
return None
# 檢查數(shù)據(jù)包長度
if len(self.buffer[start_index:]) < 6: # 最小包長度
return None
# 獲取數(shù)據(jù)長度
data_length = self.buffer[start_index + 3]
total_length = 4 + data_length + 2 # 頭部4字節(jié) + 數(shù)據(jù) + CRC2字節(jié)
# 檢查是否接收到完整數(shù)據(jù)包
if len(self.buffer[start_index:]) < total_length:
return None
# 提取完整數(shù)據(jù)包
packet = self.buffer[start_index:start_index + total_length]
# 驗(yàn)證CRC
crc_received = struct.unpack('<H', packet[-2:])[0]
crc_calculated = self.crc16(packet[:-2])
if crc_received != crc_calculated:
print("CRC校驗(yàn)失敗")
# 移除錯誤的數(shù)據(jù)包
self.buffer = self.buffer[start_index + 1:]
return None
# 解析數(shù)據(jù)包
parsed = {
'command': packet[2],
'data_length': packet[3],
'data': packet[4:4+data_length],
'crc': crc_received,
'raw': packet
}
# 從緩沖區(qū)中移除已處理的數(shù)據(jù)
self.buffer = self.buffer[start_index + total_length:]
return parsed
def send_command_with_response(self, command, data=None, timeout=1, retries=3):
"""發(fā)送命令并等待響應(yīng)"""
for attempt in range(retries):
print(f"嘗試 {attempt + 1}/{retries}")
# 發(fā)送命令
self.send_packet(command, data)
# 等待響應(yīng)
response = self.receive_packet(timeout)
if response:
print(f"收到響應(yīng): 命令={response['command']}, 數(shù)據(jù)={response['data'].hex()}")
return response
print("未收到響應(yīng),重試...")
time.sleep(0.5)
print("所有重試均失敗")
return None
# 使用示例
def protocol_example():
try:
ser = serial.Serial('COM3', 115200, timeout=1)
protocol = SerialProtocol(ser)
# 示例1: 發(fā)送讀取傳感器命令
response = protocol.send_command_with_response(
command=0x01, # 讀取傳感器命令
data=bytearray([0x00, 0x01]), # 傳感器地址
timeout=2,
retries=3
)
if response:
# 解析傳感器數(shù)據(jù)
if len(response['data']) >= 4:
temperature, humidity = struct.unpack('<HH', response['data'][:4])
temperature = temperature / 10.0
humidity = humidity / 10.0
print(f"溫度: {temperature}°C, 濕度: {humidity}%")
# 示例2: 發(fā)送控制命令
control_data = bytearray([0x01, 0x01]) # 打開設(shè)備
protocol.send_command_with_response(
command=0x02, # 控制命令
data=control_data
)
ser.close()
except Exception as e:
print(f"錯誤: {e}")
# 運(yùn)行示例
protocol_example()
6. 錯誤處理和調(diào)試
import serial
import time
import logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RobustSerial:
"""健壯的串口通信類,包含錯誤處理和調(diào)試功能"""
def __init__(self, port, baudrate=9600, **kwargs):
self.port = port
self.baudrate = baudrate
self.kwargs = kwargs
self.ser = None
self.error_count = 0
self.max_errors = 10
self.last_error_time = 0
self.error_cooldown = 5 # 錯誤冷卻時間(秒)
def open(self):
"""打開串口"""
try:
logger.info(f"嘗試打開串口 {self.port}")
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
**self.kwargs
)
# 檢查串口是否真正打開
if not self.ser.is_open:
raise serial.SerialException("串口未成功打開")
logger.info(f"串口 {self.port} 已成功打開")
# 清空緩沖區(qū)
self._flush_buffers()
return True
except serial.SerialException as e:
logger.error(f"打開串口失敗: {e}")
self._handle_error("open", e)
return False
except Exception as e:
logger.error(f"未知錯誤: {e}")
self._handle_error("unknown", e)
return False
def _flush_buffers(self):
"""清空輸入輸出緩沖區(qū)"""
try:
if self.ser:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
logger.debug("緩沖區(qū)已清空")
except Exception as e:
logger.warning(f"清空緩沖區(qū)失敗: {e}")
def write(self, data, retries=3):
"""寫入數(shù)據(jù),帶重試機(jī)制"""
for attempt in range(retries):
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打開,嘗試重新打開")
if not self.open():
continue
if isinstance(data, str):
data = data.encode('utf-8')
bytes_written = self.ser.write(data)
logger.debug(f"寫入 {bytes_written} 字節(jié): {data[:50]}...")
# 確保數(shù)據(jù)發(fā)送完成
self.ser.flush()
return bytes_written
except serial.SerialTimeoutException:
logger.warning(f"寫入超時,嘗試 {attempt + 1}/{retries}")
time.sleep(0.1)
except serial.SerialException as e:
logger.error(f"寫入失敗: {e}")
self._handle_error("write", e)
# 嘗試重新打開串口
time.sleep(0.5)
self.open()
except Exception as e:
logger.error(f"未知寫入錯誤: {e}")
self._handle_error("write_unknown", e)
break
logger.error(f"寫入失敗,已重試 {retries} 次")
return 0
def read(self, size=None, timeout=None):
"""讀取數(shù)據(jù)"""
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打開")
return None
# 設(shè)置臨時超時
original_timeout = self.ser.timeout
if timeout is not None:
self.ser.timeout = timeout
if size:
data = self.ser.read(size)
else:
# 讀取所有可用數(shù)據(jù)
data = self.ser.read(self.ser.in_waiting)
# 恢復(fù)原始超時
if timeout is not None:
self.ser.timeout = original_timeout
if data:
logger.debug(f"讀取 {len(data)} 字節(jié)")
return data
else:
return None
except serial.SerialException as e:
logger.error(f"讀取失敗: {e}")
self._handle_error("read", e)
return None
except Exception as e:
logger.error(f"未知讀取錯誤: {e}")
self._handle_error("read_unknown", e)
return None
def readline(self, timeout=None):
"""讀取一行"""
try:
if not self.ser or not self.ser.is_open:
logger.warning("串口未打開")
return None
# 設(shè)置臨時超時
original_timeout = self.ser.timeout
if timeout is not None:
self.ser.timeout = timeout
line = self.ser.readline()
# 恢復(fù)原始超時
if timeout is not None:
self.ser.timeout = original_timeout
if line:
logger.debug(f"讀取行: {line[:50]}...")
return line
else:
return None
except serial.SerialException as e:
logger.error(f"讀取行失敗: {e}")
self._handle_error("readline", e)
return None
def _handle_error(self, error_type, error):
"""處理錯誤"""
current_time = time.time()
# 檢查是否在冷卻期內(nèi)
if current_time - self.last_error_time < self.error_cooldown:
return
self.error_count += 1
self.last_error_time = current_time
logger.error(f"錯誤類型: {error_type}, 錯誤: {error}")
logger.error(f"錯誤計(jì)數(shù): {self.error_count}/{self.max_errors}")
# 如果錯誤過多,可能需要采取特殊措施
if self.error_count >= self.max_errors:
logger.critical("錯誤過多,建議檢查硬件連接")
self.error_count = 0 # 重置計(jì)數(shù)器
def close(self):
"""關(guān)閉串口"""
try:
if self.ser and self.ser.is_open:
self.ser.close()
logger.info("串口已關(guān)閉")
except Exception as e:
logger.error(f"關(guān)閉串口失敗: {e}")
def get_info(self):
"""獲取串口信息"""
if not self.ser:
return "串口未打開"
info = {
"port": self.ser.port,
"baudrate": self.ser.baudrate,
"bytesize": self.ser.bytesize,
"parity": self.ser.parity,
"stopbits": self.ser.stopbits,
"timeout": self.ser.timeout,
"is_open": self.ser.is_open,
"in_waiting": self.ser.in_waiting if self.ser.is_open else 0,
"error_count": self.error_count
}
return info
# 調(diào)試工具函數(shù)
def debug_serial_port(port, baudrate=9600):
"""調(diào)試串口"""
print(f"=== 串口調(diào)試: {port} ===")
# 1. 檢查端口是否存在
import serial.tools.list_ports
ports = [p.device for p in serial.tools.list_ports.comports()]
if port not in ports:
print(f"警告: 端口 {port} 不在可用端口列表中")
print(f"可用端口: {ports}")
# 2. 嘗試打開端口
try:
ser = serial.Serial(port, baudrate, timeout=1)
print(f"? 端口可以打開")
# 3. 獲取端口信息
print(f"端口信息:")
print(f" 名稱: {ser.name}")
print(f" 波特率: {ser.baudrate}")
print(f" 數(shù)據(jù)位: {ser.bytesize}")
print(f" 校驗(yàn)位: {ser.parity}")
print(f" 停止位: {ser.stopbits}")
# 4. 測試讀寫
print("\n測試讀寫功能:")
# 清空緩沖區(qū)
ser.reset_input_buffer()
ser.reset_output_buffer()
# 發(fā)送測試數(shù)據(jù)
test_data = b'AT\r\n'
ser.write(test_data)
print(f" 發(fā)送: {test_data}")
# 等待響應(yīng)
time.sleep(0.1)
# 讀取響應(yīng)
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f" 接收: {response}")
print(f" 解碼: {response.decode('utf-8', errors='ignore')}")
else:
print(" 警告: 沒有收到響應(yīng)")
# 5. 測試不同波特率
print("\n測試不同波特率:")
baudrates = [9600, 19200, 38400, 57600, 115200]
for baud in baudrates:
try:
ser.baudrate = baud
ser.write(b'AT\r\n')
time.sleep(0.1)
if ser.in_waiting > 0:
response = ser.read(ser.in_waiting)
print(f" {baud} bps: 收到響應(yīng) ({len(response)} 字節(jié))")
else:
print(f" {baud} bps: 無響應(yīng)")
except Exception as e:
print(f" {baud} bps: 錯誤 - {e}")
ser.close()
print("\n? 調(diào)試完成")
except serial.SerialException as e:
print(f"? 無法打開端口: {e}")
print("\n可能的解決方案:")
print("1. 檢查端口名稱是否正確")
print("2. 檢查設(shè)備是否連接")
print("3. 檢查是否有其他程序占用該端口")
print("4. 檢查驅(qū)動程序是否安裝")
print("5. 嘗試以管理員權(quán)限運(yùn)行")
except Exception as e:
print(f"? 未知錯誤: {e}")
# 使用示例
def debug_example():
# 調(diào)試特定端口
debug_serial_port('COM3', 9600)
# 使用健壯的串口類
logger.info("開始測試健壯串口類")
robust_ser = RobustSerial(
port='COM3',
baudrate=9600,
timeout=1,
write_timeout=1
)
if robust_ser.open():
# 發(fā)送數(shù)據(jù)
robust_ser.write("Hello Device!\n")
# 讀取響應(yīng)
response = robust_ser.readline(timeout=2)
if response:
print(f"收到響應(yīng): {response.decode('utf-8', errors='ignore')}")
# 獲取信息
info = robust_ser.get_info()
print(f"串口信息: {info}")
robust_ser.close()
# 運(yùn)行示例
debug_example()
7. 實(shí)際應(yīng)用示例
7.1 Arduino 通信示例
import serial
import time
import json
from datetime import datetime
class ArduinoCommunicator:
"""Arduino 串口通信類"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.connected = False
def connect(self):
"""連接到 Arduino"""
try:
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
time.sleep(2) # 等待 Arduino 重啟
self.connected = True
print(f"已連接到 Arduino ({self.port})")
return True
except Exception as e:
print(f"連接失敗: {e}")
return False
def send_command(self, command, value=None):
"""發(fā)送命令到 Arduino"""
if not self.connected:
print("未連接")
return None
try:
# 構(gòu)建命令字符串
if value is not None:
cmd_str = f"{command}:{value}\n"
else:
cmd_str = f"{command}\n"
# 發(fā)送命令
self.ser.write(cmd_str.encode('utf-8'))
print(f"發(fā)送命令: {cmd_str.strip()}")
# 讀取響應(yīng)
response = self.ser.readline().decode('utf-8', errors='ignore').strip()
if response:
print(f"Arduino 響應(yīng): {response}")
return response
except Exception as e:
print(f"通信錯誤: {e}")
return None
def read_sensors(self):
"""讀取所有傳感器數(shù)據(jù)"""
# 發(fā)送讀取命令
self.ser.write(b'READ_ALL\n')
# 等待響應(yīng)
time.sleep(0.5)
# 讀取所有可用數(shù)據(jù)
data_lines = []
while self.ser.in_waiting > 0:
line = self.ser.readline().decode('utf-8', errors='ignore').strip()
if line:
data_lines.append(line)
# 解析傳感器數(shù)據(jù)
sensors = {}
for line in data_lines:
if ':' in line:
key, value = line.split(':', 1)
try:
# 嘗試轉(zhuǎn)換為數(shù)值
sensors[key.strip()] = float(value.strip())
except:
sensors[key.strip()] = value.strip()
return sensors
def control_led(self, led_id, state):
"""控制 LED"""
state_str = "ON" if state else "OFF"
return self.send_command(f"LED{led_id}", state_str)
def read_analog(self, pin):
"""讀取模擬引腳值"""
return self.send_command(f"ANALOG{pin}")
def set_pwm(self, pin, value):
"""設(shè)置 PWM 值 (0-255)"""
return self.send_command(f"PWM{pin}", str(value))
def close(self):
"""關(guān)閉連接"""
if self.ser:
self.ser.close()
self.connected = False
print("連接已關(guān)閉")
# Arduino 數(shù)據(jù)采集應(yīng)用
def arduino_data_logger():
"""Arduino 數(shù)據(jù)記錄器"""
arduino = ArduinoCommunicator('COM3', 9600)
if not arduino.connect():
print("無法連接到 Arduino")
return
try:
# 創(chuàng)建數(shù)據(jù)文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"arduino_data_{timestamp}.csv"
with open(filename, 'w') as f:
# 寫入 CSV 頭部
f.write("timestamp,temperature,humidity,light,analog0,analog1\n")
print(f"數(shù)據(jù)將保存到: {filename}")
print("開始數(shù)據(jù)采集 (按 Ctrl+C 停止)...")
# 數(shù)據(jù)采集循環(huán)
while True:
try:
# 讀取傳感器數(shù)據(jù)
sensors = arduino.read_sensors()
if sensors:
# 獲取時間戳
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 準(zhǔn)備數(shù)據(jù)行
data_row = {
'timestamp': current_time,
'temperature': sensors.get('TEMP', 0),
'humidity': sensors.get('HUMI', 0),
'light': sensors.get('LIGHT', 0),
'analog0': sensors.get('A0', 0),
'analog1': sensors.get('A1', 0)
}
# 顯示數(shù)據(jù)
print(f"[{current_time}] "
f"溫度: {data_row['temperature']:.1f}°C, "
f"濕度: {data_row['humidity']:.1f}%, "
f"光照: {data_row['light']}")
# 保存到文件
with open(filename, 'a') as f:
csv_line = f"{data_row['timestamp']},"
csv_line += f"{data_row['temperature']},"
csv_line += f"{data_row['humidity']},"
csv_line += f"{data_row['light']},"
csv_line += f"{data_row['analog0']},"
csv_line += f"{data_row['analog1']}\n"
f.write(csv_line)
# 間隔時間
time.sleep(2)
except KeyboardInterrupt:
print("\n數(shù)據(jù)采集已停止")
break
except Exception as e:
print(f"數(shù)據(jù)采集錯誤: {e}")
time.sleep(1)
finally:
arduino.close()
print(f"數(shù)據(jù)已保存到: {filename}")
# 運(yùn)行示例
arduino_data_logger()
7.2 GPS 數(shù)據(jù)解析示例
import serial
import time
import pynmea2
class GPSReader:
"""GPS 數(shù)據(jù)讀取器"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.running = False
def start(self):
"""開始讀取 GPS 數(shù)據(jù)"""
try:
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
self.running = True
print("GPS 接收器已啟動")
# 讀取數(shù)據(jù)
self._read_loop()
except Exception as e:
print(f"啟動失敗: {e}")
def _read_loop(self):
"""讀取循環(huán)"""
buffer = ""
while self.running:
try:
# 讀取數(shù)據(jù)
if self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
buffer += data
# 處理完整的 NMEA 語句
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line.startswith('$'):
self._parse_nmea(line)
time.sleep(0.1)
except KeyboardInterrupt:
print("\n正在停止 GPS 接收器...")
self.stop()
break
except Exception as e:
print(f"讀取錯誤: {e}")
time.sleep(1)
def _parse_nmea(self, sentence):
"""解析 NMEA 語句"""
try:
msg = pynmea2.parse(sentence)
# 處理不同類型的 NMEA 語句
if isinstance(msg, pynmea2.types.talker.RMC):
# 推薦最小定位信息
if msg.status == 'A': # 數(shù)據(jù)有效
print(f"位置: {msg.latitude:.6f}, {msg.longitude:.6f}")
print(f"速度: {msg.spd_over_grnd} 節(jié), 航向: {msg.true_course}°")
print(f"時間: {msg.datetime}")
elif isinstance(msg, pynmea2.types.talker.GGA):
# GPS 定位信息
print(f"質(zhì)量: {msg.gps_qual}, 衛(wèi)星數(shù): {msg.num_sats}")
print(f"海拔: {msg.altitude} {msg.altitude_units}")
print(f"大地水準(zhǔn)面高度: {msg.geo_sep} {msg.geo_sep_units}")
elif isinstance(msg, pynmea2.types.talker.GSA):
# 當(dāng)前衛(wèi)星信息
print(f"模式: {msg.mode}, 定位類型: {msg.mode_fix_type}")
print(f"PDOP: {msg.pdop}, HDOP: {msg.hdop}, VDOP: {msg.vdop}")
except pynmea2.ParseError as e:
# 忽略解析錯誤
pass
except Exception as e:
print(f"解析錯誤: {e}")
def stop(self):
"""停止讀取"""
self.running = False
if self.ser:
self.ser.close()
print("GPS 接收器已停止")
# 使用示例
def gps_example():
"""GPS 數(shù)據(jù)讀取示例"""
gps = GPSReader('COM4', 4800) # 典型 GPS 波特率為 4800
try:
# 開始讀取 GPS 數(shù)據(jù)
gps.start()
except Exception as e:
print(f"GPS 示例錯誤: {e}")
# 運(yùn)行示例
gps_example()
7.3 串口聊天程序
import serial
import threading
import time
import sys
class SerialChat:
"""簡單的串口聊天程序"""
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.running = False
self.receive_thread = None
def start(self):
"""開始聊天"""
try:
# 打開串口
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1
)
self.running = True
# 啟動接收線程
self.receive_thread = threading.Thread(target=self._receive_messages)
self.receive_thread.daemon = True
self.receive_thread.start()
print(f"=== 串口聊天 ({self.port}) ===")
print("輸入消息并按回車發(fā)送")
print("輸入 'quit' 或 'exit' 退出")
print("=" * 40)
# 發(fā)送循環(huán)
self._send_loop()
except Exception as e:
print(f"啟動失敗: {e}")
def _receive_messages(self):
"""接收消息線程"""
buffer = ""
while self.running:
try:
if self.ser and self.ser.in_waiting > 0:
data = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
buffer += data
# 處理完整行
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if line:
print(f"\n[接收] {line}")
print("[發(fā)送] ", end="", flush=True)
time.sleep(0.01)
except Exception as e:
if self.running: # 只有在運(yùn)行中才報(bào)告錯誤
print(f"接收錯誤: {e}")
time.sleep(0.1)
def _send_loop(self):
"""發(fā)送消息循環(huán)"""
try:
while self.running:
# 獲取用戶輸入
message = input("[發(fā)送] ")
# 檢查退出命令
if message.lower() in ['quit', 'exit', 'q']:
print("正在退出...")
self.stop()
break
# 發(fā)送消息
if message and self.ser:
self.ser.write((message + '\n').encode('utf-8'))
except KeyboardInterrupt:
print("\n正在退出...")
self.stop()
except Exception as e:
print(f"發(fā)送錯誤: {e}")
self.stop()
def stop(self):
"""停止聊天"""
self.running = False
if self.receive_thread:
self.receive_thread.join(timeout=1)
if self.ser:
self.ser.close()
print("聊天已結(jié)束")
# 使用示例
def chat_example():
"""串口聊天示例"""
if len(sys.argv) > 1:
port = sys.argv[1]
else:
port = input("請輸入串口號 (如 COM3): ")
if len(sys.argv) > 2:
baudrate = int(sys.argv[2])
else:
baudrate = int(input("請輸入波特率 (默認(rèn) 9600): ") or "9600")
chat = SerialChat(port, baudrate)
chat.start()
if __name__ == "__main__":
chat_example()
總結(jié)
本教程涵蓋了 PySerial 的主要功能:
- 基礎(chǔ)使用:安裝、打開串口、基本讀寫
- 串口配置:參數(shù)設(shè)置、自動檢測端口
- 數(shù)據(jù)操作:文本和二進(jìn)制數(shù)據(jù)通信
- 高級功能:協(xié)議處理、錯誤處理、自動重連
- 實(shí)際應(yīng)用:Arduino 通信、GPS 解析、聊天程序
最佳實(shí)踐建議:
- 錯誤處理:始終使用 try-except 包裝串口操作
- 超時設(shè)置:合理設(shè)置 timeout 和 write_timeout
- 緩沖區(qū)管理:定期清空輸入輸出緩沖區(qū)
- 資源清理:確保在程序結(jié)束時關(guān)閉串口
- 線程安全:在多線程環(huán)境中使用適當(dāng)?shù)逆i機(jī)制
常見問題解決:
- 無法打開串口:檢查端口名稱、權(quán)限、是否被其他程序占用
- 數(shù)據(jù)亂碼:檢查波特率設(shè)置、數(shù)據(jù)編碼
- 通信不穩(wěn)定:檢查硬件連接、線纜質(zhì)量、接地
- 速度慢:調(diào)整波特率,優(yōu)化讀寫策略
到此這篇關(guān)于PySerial 串口通信的多種實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)PySerial 串口通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
tensorflow轉(zhuǎn)onnx的實(shí)現(xiàn)方法
本文主要介紹了tensorflow轉(zhuǎn)onnx的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
python圖片剪裁代碼(圖片按四個點(diǎn)坐標(biāo)剪裁)
這篇文章主要介紹了python圖片剪裁代碼(圖片按四個點(diǎn)坐標(biāo)剪裁),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03
python的格式化輸出(format,%)實(shí)例詳解
Python中格式化字符串目前有兩種陣營:%和format,哪一種比較適合我們使用呢?下面腳本之家小編給大家介紹下python的格式化輸出(format,%)實(shí)例詳解,感興趣的朋友一起看看吧2018-06-06

