PyMySQL數(shù)據(jù)庫連接與優(yōu)化方式
本文將介紹如何使用 PyMySQL 連接和操作 MySQL 數(shù)據(jù)庫,包括基本連接、CRUD 操作、事務(wù)處理以及如何在高并發(fā)環(huán)境下使用連接池優(yōu)化性能。
通過合理的連接池配置和錯(cuò)誤處理機(jī)制,可以構(gòu)建出穩(wěn)定高效的數(shù)據(jù)庫應(yīng)用。
一、PyMySQL 簡介
PyMySQL 是一個(gè)純 Python 實(shí)現(xiàn)的 MySQL 客戶端庫,用于連接和操作 MySQL 數(shù)據(jù)庫。它完全兼容 Python DB API 2.0 規(guī)范,提供了簡單易用的接口來執(zhí)行 SQL 查詢和操作。
核心優(yōu)勢
- 純 Python 實(shí)現(xiàn):無需外部依賴,跨平臺兼容性好
- Python 3 全面支持:兼容最新 Python 特性和語法
- 線程安全:支持多線程并發(fā)操作
- 完整功能支持:事務(wù)、存儲過程、預(yù)處理語句等
- 廣泛兼容:支持 MySQL 5.5+ 和 MariaDB
安裝方法
pip install pymysql
二、數(shù)據(jù)庫連接配置
基礎(chǔ)連接方式
import pymysql
from pymysql.cursors import DictCursor
# 推薦配置方式
def create_connection():
return pymysql.connect(
host='localhost', # 數(shù)據(jù)庫地址
user='username', # 用戶名
password='password', # 密碼
database='test_db', # 數(shù)據(jù)庫名
port=3306, # 端口,默認(rèn)3306
charset='utf8mb4', # 字符集,推薦utf8mb4
autocommit=False, # 是否自動提交
cursorclass=DictCursor # 返回字典格式結(jié)果
)
連接參數(shù)說明
| 參數(shù) | 說明 | 值 |
|---|---|---|
| host | 數(shù)據(jù)庫服務(wù)器地址 | ‘localhost’ |
| user | 用戶名 | 根據(jù)實(shí)際配置 |
| password | 密碼 | 根據(jù)實(shí)際配置 |
| database | 數(shù)據(jù)庫名稱 | 項(xiàng)目數(shù)據(jù)庫名 |
| charset | 字符編碼 | ‘utf8mb4’(支持表情符號) |
| autocommit | 自動提交事務(wù) | False(建議手動控制) |
| cursorclass | 游標(biāo)類型 | DictCursor(結(jié)果以字典返回) |
cursorclass參數(shù)說明
| cursorclass | 說明 | 返回結(jié)果格式 | 適用場景 |
|---|---|---|---|
| Cursor (默認(rèn)) | 普通游標(biāo) | 元組格式 (value1, value2, …) | 基礎(chǔ)查詢,需要最高性能時(shí) |
| DictCursor | 字典游標(biāo) | 字典格式 {‘column’: value} | 需要按列名訪問數(shù)據(jù)時(shí) |
| SSCursor | 無緩沖游標(biāo) | 元組格式,流式讀取 | 處理大量數(shù)據(jù),內(nèi)存有限時(shí) |
| SSDictCursor | 無緩沖字典游標(biāo) | 字典格式,流式讀取 | 大量數(shù)據(jù)且需要按列名訪問 |
| Cursor 子類 | 自定義游標(biāo) | 自定義格式 | 特殊數(shù)據(jù)處理需求 |
完整連接示例
import pymysql
from pymysql.cursors import DictCursor
def get_db_connection():
"""獲取數(shù)據(jù)庫連接"""
return pymysql.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
charset='utf8mb4',
autocommit=False,
cursorclass=DictCursor,
connect_timeout=10 # 連接超時(shí)10秒
)
# 使用示例
def test_connection():
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT 1 as test")
result = cursor.fetchone()
print("連接測試成功:", result)
finally:
conn.close()
test_connection()
輸出:
連接測試成功: {'test': 1}
三、數(shù)據(jù)庫基礎(chǔ)操作
創(chuàng)建示例數(shù)據(jù)表
CREATE TABLE mydb.users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
相關(guān)說明
| 關(guān)鍵字 | 類型 | 說明 |
|---|---|---|
| INT | 數(shù)據(jù)類型 | 整數(shù)類型,用于存儲整數(shù)值 |
| AUTO_INCREMENT | 約束/屬性 | 自動遞增,每次插入新記錄時(shí)自動生成唯一ID |
| PRIMARY KEY | 約束 | 主鍵,唯一標(biāo)識每條記錄 |
| VARCHAR(100) | 數(shù)據(jù)類型 | 可變長度字符串,最大100字符 |
| NOT NULL | 約束 | 該字段不能為空,必須包含值 |
| UNIQUE | 約束 | 確保每個(gè)值唯一,不允許重復(fù) |
| TIMESTAMP | 數(shù)據(jù)類型 | 時(shí)間戳類型,用于存儲日期和時(shí)間 |
| DEFAULT CURRENT_TIMESTAMP | 默認(rèn)值 | 默認(rèn)值為當(dāng)前系統(tǒng)時(shí)間 |
數(shù)據(jù)庫操作封裝類
import pymysql
from pymysql.cursors import DictCursor
from typing import List, Dict, Any, Optional, Tuple
class MySQLManager:
"""MySQL 數(shù)據(jù)庫管理類"""
def __init__(self, config: Dict[str, Any]):
self.config = config
def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
"""執(zhí)行查詢語句(SELECT)"""
conn = pymysql.connect(**self.config)
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params or ())
return cursor.fetchall()
finally:
conn.close()
def execute_update(self, sql: str, params: Tuple = None) -> int:
"""執(zhí)行更新語句(INSERT/UPDATE/DELETE)"""
conn = pymysql.connect(**self.config)
try:
with conn.cursor() as cursor:
affected_rows = cursor.execute(sql, params or ())
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
CRUD 操作示例
| 操作 | 英文 | 中文 | 對應(yīng) SQL | 描述 |
|---|---|---|---|---|
| C | Create | 創(chuàng)建 | INSERT | 創(chuàng)建新記錄 |
| R | Read | 讀取 | SELECT | 查詢/讀取數(shù)據(jù) |
| U | Update | 更新 | UPDATE | 修改現(xiàn)有記錄 |
| D | Delete | 刪除 | DELETE | 刪除記錄 |
# 數(shù)據(jù)庫配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4',
'cursorclass': DictCursor
}
db = MySQLManager(db_config)
# 1. 插入數(shù)據(jù)
def add_user(name: str, email: str, age: int) -> int:
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
return db.execute_update(sql, (name, email, age))
# 2. 查詢數(shù)據(jù)
def get_all_users() -> List[Dict]:
return db.execute_query("SELECT * FROM users")
# 3. 更新數(shù)據(jù)
def update_user_email(user_id: int, new_email: str) -> int:
sql = "UPDATE users SET email = %s WHERE id = %s"
return db.execute_update(sql, (new_email, user_id))
# 4. 刪除數(shù)據(jù)
def delete_user(user_id: int) -> int:
return db.execute_update("DELETE FROM users WHERE id = %s", (user_id,))
if __name__ == '__main__':
users = get_all_users()
print(f"查詢所有用戶: {users}")
user_id = add_user("張三", "zhangsan@example.com", 25)
print(f"執(zhí)行:插入新用戶")
users = get_all_users()
print(f"查詢所有用戶: {users}")
user_id = users[0]['id']
affected_rows = update_user_email(user_id, "zhangsan2@example.com")
print(f"執(zhí)行:更新郵箱,影響行數(shù): {affected_rows}")
users = get_all_users()
print(f"查詢所有用戶: {users}")
affected_rows = delete_user(user_id)
print(f"執(zhí)行:刪除用戶,影響行數(shù): {affected_rows}")
users = get_all_users()
print(f"查詢所有用戶: {users}")
輸出:
查詢所有用戶: ()
執(zhí)行:插入新用戶
查詢所有用戶: [{'id': 3, 'name': '張三', 'email': 'zhangsan@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
執(zhí)行:更新郵箱,影響行數(shù): 1
查詢所有用戶: [{'id': 3, 'name': '張三', 'email': 'zhangsan2@example.com', 'age': 25, 'created_at': datetime.datetime(2025, 9, 23, 19, 25, 11)}]
執(zhí)行:刪除用戶,影響行數(shù): 1
查詢所有用戶: ()
事務(wù)處理示例
模擬簡單的轉(zhuǎn)賬操作,從一個(gè)用戶賬戶轉(zhuǎn)移到另一個(gè)用戶賬戶。
def transfer_points(sender_id: int, receiver_id: int, points: int) -> bool:
"""轉(zhuǎn)賬操作(事務(wù)示例)"""
conn = pymysql.connect(**db_config)
try:
with conn.cursor(DictCursor) as cursor:
# 檢查發(fā)送者余額
cursor.execute("SELECT points FROM accounts WHERE user_id = %s", (sender_id,))
sender = cursor.fetchone()
if not sender or sender['points'] < points:
raise ValueError("余額不足")
# 執(zhí)行轉(zhuǎn)賬
cursor.execute("UPDATE accounts SET points = points - %s WHERE user_id = %s",
(points, sender_id))
cursor.execute("UPDATE accounts SET points = points + %s WHERE user_id = %s",
(points, receiver_id))
conn.commit()
return True
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
批量操作
def batch_insert_users(users: List[tuple]) -> int:
"""批量插入用戶數(shù)據(jù)"""
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
conn = pymysql.connect(**db_config)
try:
with conn.cursor() as cursor:
affected_rows = cursor.executemany(sql, users)
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
# 使用示例
users_data = [
('張三', 'zhangsan@example.com', 25),
('李四', 'lisi@example.com', 30)
]
batch_insert_users(users_data)
四、連接池優(yōu)化
為什么需要連接池
頻繁創(chuàng)建和關(guān)閉數(shù)據(jù)庫連接會導(dǎo)致:
- 資源浪費(fèi)(TCP 連接建立開銷)
- 性能下降(連接初始化時(shí)間)
- 連接數(shù)耗盡(超過數(shù)據(jù)庫最大連接數(shù))
連接池通過復(fù)用連接解決這些問題。
使用 DBUtils 實(shí)現(xiàn)連接池
安裝方法
pip install DBUtils
實(shí)現(xiàn)示例
from dbutils.pooled_db import PooledDB
import pymysql
import threading
from typing import List, Dict, Any, Tuple
from pymysql.cursors import DictCursor
class ConnectionPool:
"""數(shù)據(jù)庫連接池"""
_instance = None
_lock = threading.Lock()
def __new__(cls, config: Dict[str, Any]):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.pool_config = config.copy()
cls._instance._pool = PooledDB(
creator=pymysql,
maxconnections=20, # 最大連接數(shù)
mincached=2, # 初始空閑連接
maxcached=10, # 最大空閑連接
blocking=True, # 連接耗盡時(shí)等待
ping=1, # 使用時(shí)檢查連接
**config
)
return cls._instance
def get_connection(self):
"""從連接池獲取連接"""
return self._pool.connection()
# 使用連接池的數(shù)據(jù)庫管理器
class PooledDBManager:
def __init__(self, pool_config: Dict[str, Any]):
self.pool = ConnectionPool(pool_config)
def execute_query(self, sql: str, params: Tuple = None) -> List[Dict]:
"""執(zhí)行查詢"""
conn = self.pool.get_connection()
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params or ())
return cursor.fetchall()
finally:
conn.close() # 實(shí)際是放回連接池
def execute_update(self, sql: str, params: Tuple = None) -> int:
"""執(zhí)行更新"""
conn = self.pool.get_connection()
try:
with conn.cursor() as cursor:
affected_rows = cursor.execute(sql, params or ())
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
ping 參數(shù)說明
0 = 不檢查 1 = 每次請求時(shí)檢查(推薦) 2 = 每次游標(biāo)創(chuàng)建時(shí)檢查 4 = 每次執(zhí)行時(shí)檢查 7 = 1+2+4(所有檢查)
五、應(yīng)用示例
Flask 集成示例
from dbutils.pooled_db import PooledDB
from flask import Flask, request, jsonify
from pymysql.cursors import DictCursor
app = Flask(__name__)
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化連接池
db_manager = PooledDBManager(db_config)
@app.route('/users', methods=['GET'])
def get_users():
"""獲取所有用戶"""
try:
users = db_manager.execute_query("SELECT * FROM users")
return jsonify({'success': True, 'data': users})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/users', methods=['POST'])
def create_user():
"""創(chuàng)建用戶"""
try:
data = request.json
sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
result = db_manager.execute_update(sql, (data['name'], data['email'], data['age']))
return jsonify({'success': True, 'affected_rows': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
連接池實(shí)踐配置
# 優(yōu)化后的連接池配置
optimal_pool_config = {
'maxconnections': 20, # 根據(jù)并發(fā)量調(diào)整
'mincached': 2, # 減少初始資源占用
'maxcached': 10, # 控制最大空閑連接
'blocking': True, # 避免連接耗盡錯(cuò)誤
'ping': 1, # 使用前檢查連接健康
**db_config # 基礎(chǔ)數(shù)據(jù)庫配置
}
錯(cuò)誤重試機(jī)制
數(shù)據(jù)庫操作重試裝飾器:當(dāng)數(shù)據(jù)庫連接出現(xiàn)臨時(shí)故障時(shí),會自動進(jìn)行最多3次重試,并且每次重試間隔時(shí)間按指數(shù)增長(1秒、2秒、4秒),提高程序的容錯(cuò)能力。
import time
from functools import wraps
import pymysql
def retry_on_failure(max_retries=3, initial_delay=1):
"""數(shù)據(jù)庫操作重試裝飾器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (pymysql.OperationalError, pymysql.InterfaceError) as e:
if attempt == max_retries - 1:
raise e
time.sleep(initial_delay * (2 ** attempt)) # 指數(shù)退避
return None
return wrapper
return decorator
# 使用示例
@retry_on_failure(max_retries=3)
def robust_query(sql, params=None):
return db_manager.execute_query(sql, params)
指數(shù)退避:當(dāng)操作失敗時(shí),不立即重試,而是等待一段時(shí)間,且每次重試的等待時(shí)間呈指數(shù)級增長。等待 1 秒, 2 秒, 4 秒,8 秒…
六、SQL事務(wù)操作對比
事務(wù)影響
| 操作類型 | 語法示例 | 主要用途 | 返回值 | 事務(wù)影響 | 性能考慮 | 使用場景 |
|---|---|---|---|---|---|---|
| SELECT (查詢) | SELECT * FROM users WHERE age > 18; | 從數(shù)據(jù)庫中檢索數(shù)據(jù) | 結(jié)果集(0行或多行) | 只讀操作,不影響數(shù)據(jù) | 索引優(yōu)化很重要,避免全表掃描 | 數(shù)據(jù)查詢、報(bào)表生成、數(shù)據(jù)分析 |
| UPDATE (更新) | UPDATE users SET age = 20 WHERE id = 1; | 修改現(xiàn)有記錄 | 受影響的行數(shù) | 需要事務(wù)控制,會鎖定行 | WHERE 條件要精確,避免鎖表 | 修改用戶信息、更新狀態(tài)、調(diào)整數(shù)值 |
| INSERT (插入) | INSERT INTO users (name, age) VALUES (‘張三’, 25); | 添加新記錄 | 插入的行數(shù)(通常是1) | 需要事務(wù)控制 | 批量插入比單條插入高效 | 新增用戶、創(chuàng)建訂單、記錄日志 |
| DELETE (刪除) | DELETE FROM users WHERE id = 1; | 刪除記錄 | 受影響的行數(shù) | 需要事務(wù)控制,謹(jǐn)慎使用 | 建議軟刪除,避免物理刪除 | 刪除用戶、清理數(shù)據(jù)、撤銷操作 |
事務(wù)特性
| 操作 | 是否自動提交 | 鎖級別 | 回滾支持 | 并發(fā)影響 |
|---|---|---|---|---|
| SELECT | 是(可設(shè)置) | 共享鎖 | 可回滾到快照 | 低(讀寫不阻塞) |
| UPDATE | 否 | 排他鎖 | 完全支持 | 高(會阻塞其他寫操作) |
| INSERT | 否 | 排他鎖 | 完全支持 | 中(可能觸發(fā)索引重建) |
| DELETE | 否 | 排他鎖 | 完全支持 | 高(會阻塞其他操作) |
- 排他鎖(X鎖):寫鎖,一個(gè)事務(wù)獨(dú)占資源,其他事務(wù)不能讀寫
- 共享鎖(S鎖):讀鎖,多個(gè)事務(wù)可同時(shí)讀取,但不能寫入
- 排他鎖 = 獨(dú)占,共享鎖 = 共享讀
普通 SELECT 是完全無鎖的,不會阻塞其他事務(wù)的寫操作,也不會被寫操作阻塞。只有顯式加鎖的SELECT才會影響并發(fā)。
七、總結(jié)
連接管理
- 使用連接池管理數(shù)據(jù)庫連接
- 合理配置連接池參數(shù)
- 及時(shí)釋放連接回池
事務(wù)控制
- 明確控制事務(wù)邊界
- 及時(shí)提交或回滾事務(wù)
- 處理并發(fā)場景下的數(shù)據(jù)一致性
錯(cuò)誤處理
- 實(shí)現(xiàn)適當(dāng)?shù)闹卦嚈C(jī)制
- 記錄詳細(xì)的錯(cuò)誤日志
- 區(qū)分業(yè)務(wù)錯(cuò)誤和系統(tǒng)錯(cuò)誤
性能優(yōu)化
- 使用預(yù)處理語句防止 SQL 注入
- 合理使用批量操作
- 監(jiān)控連接池使用情況
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python操作Elasticsearch處理timeout超時(shí)
這篇文章主要介紹了Python操作Elasticsearch處理timeout超時(shí),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
Python使用poplib模塊和smtplib模塊收發(fā)電子郵件的教程
smtplib模塊一般我們比較熟悉、這里我們會來講解使用smtplib發(fā)送SSL/TLS安全郵件的方法,而poplib模塊則負(fù)責(zé)處理接收pop3協(xié)議的郵件,下面我們就來看Python使用poplib模塊和smtplib模塊收發(fā)電子郵件的教程2016-07-07
python?kornia計(jì)算機(jī)視覺庫實(shí)現(xiàn)圖像變化
這篇文章主要為大家介紹了python?kornia計(jì)算機(jī)視覺庫實(shí)現(xiàn)圖像變化算法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Python實(shí)現(xiàn)WGS84火星百度及web墨卡托四種坐標(biāo)系相互轉(zhuǎn)換
主流被使用的地理坐標(biāo)系并不統(tǒng)一,常用的有WGS84、GCJ02(火星坐標(biāo)系)、BD09(百度坐標(biāo)系)以及百度地圖中保存矢量信息的web墨卡托,本文利用Python編寫相關(guān)類以實(shí)現(xiàn)4種坐標(biāo)系統(tǒng)之間的互相轉(zhuǎn)換2023-08-08
和孩子一起學(xué)習(xí)python之變量命名規(guī)則
這篇文章我們給大家總結(jié)了關(guān)于兒童學(xué)習(xí)python中的變量命名規(guī)則相關(guān)知識點(diǎn)內(nèi)容,有興趣的朋友跟著參考學(xué)習(xí)下。2018-05-05

