基于Python和Telegram API構(gòu)建一個(gè)消息機(jī)器人
1. 引言:Telegram機(jī)器人的強(qiáng)大功能與應(yīng)用場(chǎng)景
1.1 Telegram機(jī)器人的市場(chǎng)地位
Telegram作為全球最受歡迎的即時(shí)通訊應(yīng)用之一,擁有超過(guò)7億月活躍用戶。其開放的API和強(qiáng)大的機(jī)器人平臺(tái)為開發(fā)者提供了無(wú)限的可能性。根據(jù)官方統(tǒng)計(jì),Telegram平臺(tái)上已有超過(guò)50萬(wàn)個(gè)活躍機(jī)器人,每天處理數(shù)十億條消息。
1.2 機(jī)器人的實(shí)際應(yīng)用價(jià)值
Telegram機(jī)器人在各個(gè)領(lǐng)域都發(fā)揮著重要作用:
- 客戶服務(wù):24/7自動(dòng)應(yīng)答,處理常見問(wèn)題
- 內(nèi)容推送:新聞聚合、博客更新、價(jià)格提醒
- 自動(dòng)化工具:文件轉(zhuǎn)換、數(shù)據(jù)查詢、任務(wù)管理
- 娛樂(lè)互動(dòng):游戲、投票、問(wèn)答系統(tǒng)
- 商業(yè)應(yīng)用:電商助手、支付提醒、訂單跟蹤
1.3 Python在機(jī)器人開發(fā)中的優(yōu)勢(shì)
Python因其簡(jiǎn)潔的語(yǔ)法和豐富的生態(tài)系統(tǒng),成為開發(fā)Telegram機(jī)器人的首選語(yǔ)言:
# Python開發(fā)機(jī)器人的核心優(yōu)勢(shì)
advantages = {
"語(yǔ)法簡(jiǎn)潔": "快速原型開發(fā),代碼可讀性強(qiáng)",
"生態(tài)豐富": "python-telegram-bot等成熟庫(kù)",
"異步支持": "高效處理并發(fā)消息",
"部署簡(jiǎn)單": "多種部署方案可選",
"社區(qū)活躍": "豐富的學(xué)習(xí)資源和解決方案"
}
2. 環(huán)境準(zhǔn)備與基礎(chǔ)配置
2.1 獲取Telegram Bot Token
在開始開發(fā)之前,我們需要先創(chuàng)建機(jī)器人并獲取訪問(wèn)憑證:
#!/usr/bin/env python3
"""
Telegram機(jī)器人開發(fā)環(huán)境配置
"""
import os
from typing import Dict, Any
class BotConfig:
"""
機(jī)器人配置管理類
"""
def __init__(self):
self.token = None
self.webhook_url = None
self.admin_ids = []
def setup_environment(self) -> bool:
"""
設(shè)置開發(fā)環(huán)境
Returns:
bool: 配置是否成功
"""
# 從環(huán)境變量獲取Token
self.token = os.getenv('TELEGRAM_BOT_TOKEN')
if not self.token:
print("? 未找到TELEGRAM_BOT_TOKEN環(huán)境變量")
print("請(qǐng)按照以下步驟配置:")
print("1. 在Telegram中搜索 @BotFather")
print("2. 發(fā)送 /newbot 創(chuàng)建新機(jī)器人")
print("3. 設(shè)置機(jī)器人名稱和用戶名")
print("4. 復(fù)制獲得的Token")
print("5. 設(shè)置環(huán)境變量: export TELEGRAM_BOT_TOKEN='你的token'")
return False
# 設(shè)置webhook URL(可選,用于生產(chǎn)環(huán)境)
self.webhook_url = os.getenv('WEBHOOK_URL', '')
# 管理員ID列表
admin_env = os.getenv('ADMIN_IDS', '')
if admin_env:
self.admin_ids = [int(id.strip()) for id in admin_env.split(',')]
print("? 環(huán)境配置完成")
print(f" 機(jī)器人Token: {self.token[:10]}...")
print(f" 管理員ID: {self.admin_ids}")
return True
def validate_token(self, token: str) -> bool:
"""
驗(yàn)證Token格式
Args:
token: 機(jī)器人Token
Returns:
bool: Token格式是否正確
"""
if not token or not isinstance(token, str):
return False
# Telegram Bot Token格式: 數(shù)字:字母
parts = token.split(':')
if len(parts) != 2:
return False
# 第一部分應(yīng)為純數(shù)字
if not parts[0].isdigit():
return False
# 第二部分應(yīng)包含字母數(shù)字
if not parts[1].replace('_', '').isalnum():
return False
return True
def test_bot_connection(token: str) -> Dict[str, Any]:
"""
測(cè)試機(jī)器人連接
Args:
token: 機(jī)器人Token
Returns:
dict: 連接測(cè)試結(jié)果
"""
import requests
url = f"https://api.telegram.org/bot{token}/getMe"
try:
response = requests.get(url, timeout=10)
data = response.json()
if data.get('ok'):
bot_info = data['result']
return {
'success': True,
'bot_username': bot_info.get('username'),
'bot_name': bot_info.get('first_name'),
'can_join_groups': bot_info.get('can_join_groups', False),
'can_read_messages': bot_info.get('can_read_all_group_messages', False)
}
else:
return {
'success': False,
'error': data.get('description', 'Unknown error')
}
except requests.exceptions.RequestException as e:
return {
'success': False,
'error': f"網(wǎng)絡(luò)請(qǐng)求失敗: {e}"
}
# 配置演示
if __name__ == "__main__":
config = BotConfig()
if config.setup_environment():
# 測(cè)試連接
result = test_bot_connection(config.token)
if result['success']:
print(f"? 機(jī)器人連接成功!")
print(f" 用戶名: @{result['bot_username']}")
print(f" 名稱: {result['bot_name']}")
print(f" 可加入群組: {result['can_join_groups']}")
else:
print(f"? 連接失敗: {result['error']}")
else:
print("請(qǐng)先配置環(huán)境變量")
2.2 安裝必要的Python庫(kù)
"""
依賴管理腳本
"""
import subprocess
import sys
import importlib
def install_package(package: str) -> bool:
"""
安裝Python包
Args:
package: 包名稱
Returns:
bool: 安裝是否成功
"""
try:
subprocess.check_call([
sys.executable, "-m", "pip", "install", package
])
return True
except subprocess.CalledProcessError:
return False
def check_dependencies() -> Dict[str, bool]:
"""
檢查依賴包是否已安裝
Returns:
dict: 依賴包安裝狀態(tài)
"""
required_packages = {
'python-telegram-bot': 'telegram',
'requests': 'requests',
'python-dotenv': 'dotenv',
'aiohttp': 'aiohttp',
'pytz': 'pytz',
'schedule': 'schedule'
}
status = {}
for package, import_name in required_packages.items():
try:
importlib.import_module(import_name)
status[package] = True
print(f"? {package} 已安裝")
except ImportError:
status[package] = False
print(f"? {package} 未安裝")
return status
def setup_dependencies() -> bool:
"""
安裝所有必需的依賴包
Returns:
bool: 所有依賴是否安裝成功
"""
print("開始安裝依賴包...")
packages = [
'python-telegram-bot[job-queue]==20.7',
'requests==2.31.0',
'python-dotenv==1.0.0',
'aiohttp==3.8.5',
'pytz==2023.3',
'schedule==1.2.0'
]
success_count = 0
for package in packages:
print(f"安裝 {package}...")
if install_package(package):
success_count += 1
print(f"? {package} 安裝成功")
else:
print(f"? {package} 安裝失敗")
print(f"\n安裝完成: {success_count}/{len(packages)}")
return success_count == len(packages)
if __name__ == "__main__":
print("檢查依賴環(huán)境...")
status = check_dependencies()
missing = [pkg for pkg, installed in status.items() if not installed]
if missing:
print(f"\n缺少 {len(missing)} 個(gè)依賴包")
response = input("是否自動(dòng)安裝? (y/n): ")
if response.lower() == 'y':
setup_dependencies()
else:
print("? 所有依賴包已安裝")
3. 基礎(chǔ)機(jī)器人架構(gòu)設(shè)計(jì)
3.1 機(jī)器人系統(tǒng)架構(gòu)

3.2 核心類設(shè)計(jì)
#!/usr/bin/env python3
"""
Telegram機(jī)器人核心架構(gòu)
"""
import logging
from typing import Dict, List, Optional, Callable, Any
from abc import ABC, abstractmethod
from enum import Enum
class MessageType(Enum):
"""消息類型枚舉"""
TEXT = "text"
COMMAND = "command"
CALLBACK_QUERY = "callback_query"
PHOTO = "photo"
DOCUMENT = "document"
LOCATION = "location"
class BaseHandler(ABC):
"""處理器基類"""
@abstractmethod
def can_handle(self, update, context) -> bool:
"""判斷是否能處理該消息"""
pass
@abstractmethod
def handle(self, update, context) -> Any:
"""處理消息"""
pass
class TelegramBotCore:
"""
機(jī)器人核心類
負(fù)責(zé)消息路由和處理器管理
"""
def __init__(self, token: str):
self.token = token
self.handlers: Dict[MessageType, List[BaseHandler]] = {}
self.middlewares: List[Callable] = []
self.logger = self._setup_logging()
# 初始化處理器字典
for msg_type in MessageType:
self.handlers[msg_type] = []
def _setup_logging(self) -> logging.Logger:
"""設(shè)置日志配置"""
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
return logging.getLogger(__name__)
def add_handler(self, handler: BaseHandler, message_type: MessageType) -> None:
"""
添加消息處理器
Args:
handler: 處理器實(shí)例
message_type: 消息類型
"""
self.handlers[message_type].append(handler)
self.logger.info(f"添加 {message_type.value} 處理器: {handler.__class__.__name__}")
def add_middleware(self, middleware: Callable) -> None:
"""
添加中間件
Args:
middleware: 中間件函數(shù)
"""
self.middlewares.append(middleware)
self.logger.info(f"添加中間件: {middleware.__name__}")
def process_update(self, update, context) -> Optional[Any]:
"""
處理更新消息
Args:
update: 更新對(duì)象
context: 上下文對(duì)象
Returns:
處理結(jié)果
"""
# 執(zhí)行中間件
for middleware in self.middlewares:
if not middleware(update, context):
self.logger.debug("中間件攔截消息")
return None
# 確定消息類型
message_type = self._determine_message_type(update)
if not message_type:
self.logger.warning("無(wú)法識(shí)別的消息類型")
return None
# 查找匹配的處理器
for handler in self.handlers[message_type]:
if handler.can_handle(update, context):
try:
result = handler.handle(update, context)
self.logger.info(f"處理器 {handler.__class__.__name__} 處理成功")
return result
except Exception as e:
self.logger.error(f"處理器執(zhí)行錯(cuò)誤: {e}")
return None
self.logger.debug(f"沒(méi)有找到合適的 {message_type.value} 處理器")
return None
def _determine_message_type(self, update) -> Optional[MessageType]:
"""確定消息類型"""
if update.message:
if update.message.text:
if update.message.text.startswith('/'):
return MessageType.COMMAND
else:
return MessageType.TEXT
elif update.message.photo:
return MessageType.PHOTO
elif update.message.document:
return MessageType.DOCUMENT
elif update.message.location:
return MessageType.LOCATION
elif update.callback_query:
return MessageType.CALLBACK_QUERY
return None
class CommandHandler(BaseHandler):
"""命令處理器"""
def __init__(self, command: str, handler_func: Callable):
self.command = command
self.handler_func = handler_func
def can_handle(self, update, context) -> bool:
"""檢查是否為指定命令"""
if (update.message and
update.message.text and
update.message.text.startswith('/')):
command = update.message.text.split()[0].lower()
return command == self.command.lower()
return False
def handle(self, update, context) -> Any:
"""處理命令"""
return self.handler_func(update, context)
class TextMessageHandler(BaseHandler):
"""文本消息處理器"""
def __init__(self, pattern: str, handler_func: Callable):
self.pattern = pattern
self.handler_func = handler_func
def can_handle(self, update, context) -> bool:
"""檢查消息是否匹配模式"""
if update.message and update.message.text:
return self.pattern in update.message.text.lower()
return False
def handle(self, update, context) -> Any:
"""處理文本消息"""
return self.handler_func(update, context)
4. 完整機(jī)器人實(shí)現(xiàn)
4.1 多功能機(jī)器人實(shí)現(xiàn)
#!/usr/bin/env python3
"""
多功能Telegram機(jī)器人實(shí)現(xiàn)
支持命令處理、消息回復(fù)、文件處理、定時(shí)任務(wù)等功能
"""
import os
import logging
import json
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
from telegram import (
Update,
Bot,
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
KeyboardButton,
InputFile
)
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
ContextTypes,
filters,
ConversationHandler
)
from telegram.error import TelegramError
# 配置日志
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# 對(duì)話狀態(tài)
class ConversationState(Enum):
"""對(duì)話狀態(tài)枚舉"""
WAITING_FOR_NAME = 1
WAITING_FOR_AGE = 2
WAITING_FOR_FEEDBACK = 3
@dataclass
class UserData:
"""用戶數(shù)據(jù)結(jié)構(gòu)"""
user_id: int
username: str
first_name: str
last_name: str
join_date: datetime
message_count: int = 0
class MultiFunctionBot:
"""
多功能Telegram機(jī)器人
"""
def __init__(self, token: str):
self.token = token
self.application = Application.builder().token(token).build()
self.user_data: Dict[int, UserData] = {}
self.setup_handlers()
# 加載用戶數(shù)據(jù)
self.load_user_data()
def setup_handlers(self) -> None:
"""設(shè)置消息處理器"""
# 命令處理器
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("stats", self.stats_command))
self.application.add_handler(CommandHandler("weather", self.weather_command))
self.application.add_handler(CommandHandler("calc", self.calculator_command))
self.application.add_handler(CommandHandler("remind", self.reminder_command))
# 對(duì)話處理器
conv_handler = ConversationHandler(
entry_points=[CommandHandler("survey", self.survey_start)],
states={
ConversationState.WAITING_FOR_NAME: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.survey_name)
],
ConversationState.WAITING_FOR_AGE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.survey_age)
],
ConversationState.WAITING_FOR_FEEDBACK: [
MessageHandler(filters.TEXT & ~filters.COMMAND, self.survey_feedback)
],
},
fallbacks=[CommandHandler("cancel", self.survey_cancel)],
)
self.application.add_handler(conv_handler)
# 回調(diào)查詢處理器(按鈕點(diǎn)擊)
self.application.add_handler(CallbackQueryHandler(self.button_handler))
# 消息處理器
self.application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, self.echo_message)
)
self.application.add_handler(
MessageHandler(filters.PHOTO, self.handle_photo)
)
self.application.add_handler(
MessageHandler(filters.DOCUMENT, self.handle_document)
)
# 錯(cuò)誤處理器
self.application.add_error_handler(self.error_handler)
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /start 命令"""
user = update.effective_user
chat = update.effective_chat
# 記錄用戶信息
self._record_user_activity(user)
# 創(chuàng)建歡迎鍵盤
keyboard = [
[KeyboardButton("?? 查看統(tǒng)計(jì)"), KeyboardButton("??? 天氣查詢")],
[KeyboardButton("?? 計(jì)算器"), KeyboardButton("? 設(shè)置提醒")],
[KeyboardButton("?? 參與調(diào)查"), KeyboardButton("?? 幫助")]
]
reply_markup = ReplyKeyboardMarkup(keyboard, resize_keyboard=True)
welcome_text = f"""
?? 你好 {user.first_name}!
歡迎使用多功能機(jī)器人!我可以為你提供以下服務(wù):
?? **數(shù)據(jù)統(tǒng)計(jì)** - 查看使用情況
??? **天氣查詢** - 獲取天氣信息
?? **計(jì)算器** - 進(jìn)行數(shù)學(xué)計(jì)算
? **提醒功能** - 設(shè)置定時(shí)提醒
?? **問(wèn)卷調(diào)查** - 參與用戶調(diào)查
?? **文件處理** - 處理圖片和文檔
使用 /help 查看詳細(xì)命令列表,或者點(diǎn)擊下方按鈕開始使用!
"""
await update.message.reply_text(welcome_text, reply_markup=reply_markup)
logger.info(f"用戶 {user.id} 啟動(dòng)了機(jī)器人")
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /help 命令"""
help_text = """
?? **機(jī)器人命令列表**
**基礎(chǔ)命令:**
/start - 啟動(dòng)機(jī)器人
/help - 顯示幫助信息
/stats - 查看使用統(tǒng)計(jì)
**實(shí)用工具:**
/weather <城市> - 查詢天氣
/calc <表達(dá)式> - 數(shù)學(xué)計(jì)算
/remind <時(shí)間> <消息> - 設(shè)置提醒
**交互功能:**
/survey - 參與用戶調(diào)查
**示例用法:**
/weather 北京
/calc 2+3*4
/remind 30 記得開會(huì)
?? 提示:你也可以使用下方的快捷按鈕進(jìn)行操作!
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def stats_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /stats 命令"""
user = update.effective_user
if user.id not in self.user_data:
await update.message.reply_text("? 未找到你的使用數(shù)據(jù)")
return
user_data = self.user_data[user.id]
total_users = len(self.user_data)
total_messages = sum(ud.message_count for ud in self.user_data.values())
stats_text = f"""
?? **個(gè)人使用統(tǒng)計(jì)**
?? 用戶信息:
? 用戶名: {user_data.username or '未設(shè)置'}
? 姓名: {user_data.first_name} {user_data.last_name or ''}
? 加入時(shí)間: {user_data.join_date.strftime('%Y-%m-%d %H:%M')}
?? 使用數(shù)據(jù):
? 消息數(shù)量: {user_data.message_count}
? 總用戶數(shù): {total_users}
? 總消息數(shù): {total_messages}
?? 機(jī)器人運(yùn)行:
? 運(yùn)行狀態(tài): ? 正常
? 最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M')}
"""
await update.message.reply_text(stats_text, parse_mode='Markdown')
async def weather_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /weather 命令"""
if not context.args:
await update.message.reply_text("? 請(qǐng)指定城市名稱,例如: /weather 北京")
return
city = ' '.join(context.args)
# 模擬天氣API調(diào)用
weather_data = await self._get_weather_data(city)
if weather_data:
weather_text = f"""
??? **{city} 天氣信息**
?? 更新時(shí)間: {weather_data['time']}
??? 溫度: {weather_data['temp']}°C
?? 濕度: {weather_data['humidity']}%
??? 風(fēng)速: {weather_data['wind']} km/h
?? 天氣: {weather_data['condition']}
?? 描述: {weather_data['description']}
"""
# 添加建議按鈕
keyboard = [
[InlineKeyboardButton("?? 刷新天氣", callback_data=f"refresh_weather_{city}")],
[InlineKeyboardButton("?? 其他城市", callback_data="other_city")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
weather_text,
reply_markup=reply_markup,
parse_mode='Markdown'
)
else:
await update.message.reply_text(f"? 無(wú)法獲取 {city} 的天氣信息")
async def calculator_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /calc 命令"""
if not context.args:
# 顯示計(jì)算器鍵盤
keyboard = [
[
InlineKeyboardButton("7", callback_data="calc_7"),
InlineKeyboardButton("8", callback_data="calc_8"),
InlineKeyboardButton("9", callback_data="calc_9"),
InlineKeyboardButton("÷", callback_data="calc_divide")
],
[
InlineKeyboardButton("4", callback_data="calc_4"),
InlineKeyboardButton("5", callback_data="calc_5"),
InlineKeyboardButton("6", callback_data="calc_6"),
InlineKeyboardButton("×", callback_data="calc_multiply")
],
[
InlineKeyboardButton("1", callback_data="calc_1"),
InlineKeyboardButton("2", callback_data="calc_2"),
InlineKeyboardButton("3", callback_data="calc_3"),
InlineKeyboardButton("-", callback_data="calc_subtract")
],
[
InlineKeyboardButton("0", callback_data="calc_0"),
InlineKeyboardButton(".", callback_data="calc_decimal"),
InlineKeyboardButton("=", callback_data="calc_equals"),
InlineKeyboardButton("+", callback_data="calc_add")
],
[
InlineKeyboardButton("C", callback_data="calc_clear"),
InlineKeyboardButton("(", callback_data="calc_lparen"),
InlineKeyboardButton(")", callback_data="calc_rparen"),
InlineKeyboardButton("?", callback_data="calc_backspace")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"?? **計(jì)算器**\n\n當(dāng)前表達(dá)式: `0`\n結(jié)果: `0`",
reply_markup=reply_markup,
parse_mode='Markdown'
)
return
# 計(jì)算表達(dá)式
expression = ' '.join(context.args)
try:
# 安全地計(jì)算數(shù)學(xué)表達(dá)式
result = self._safe_eval(expression)
await update.message.reply_text(f"?? 計(jì)算結(jié)果:\n`{expression} = {result}`",
parse_mode='Markdown')
except Exception as e:
await update.message.reply_text(f"? 計(jì)算錯(cuò)誤: {str(e)}")
async def reminder_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /remind 命令"""
if len(context.args) < 2:
await update.message.reply_text(
"? 使用方法: /remind <分鐘> <提醒內(nèi)容>\n"
"示例: /remind 30 記得開會(huì)"
)
return
try:
minutes = int(context.args[0])
message = ' '.join(context.args[1:])
if minutes <= 0:
await update.message.reply_text("? 時(shí)間必須大于0分鐘")
return
# 設(shè)置提醒
remind_time = datetime.now() + timedelta(minutes=minutes)
context.job_queue.run_once(
self._send_reminder,
minutes * 60,
data={
'chat_id': update.effective_chat.id,
'message': message
}
)
await update.message.reply_text(
f"? 提醒已設(shè)置!\n"
f"? 時(shí)間: {remind_time.strftime('%H:%M')}\n"
f"?? 內(nèi)容: {message}"
)
except ValueError:
await update.message.reply_text("? 時(shí)間參數(shù)必須是數(shù)字")
async def _send_reminder(self, context: ContextTypes.DEFAULT_TYPE) -> None:
"""發(fā)送提醒"""
job_data = context.job.data
await context.bot.send_message(
chat_id=job_data['chat_id'],
text=f"? **提醒**: {job_data['message']}",
parse_mode='Markdown'
)
# 調(diào)查對(duì)話處理函數(shù)
async def survey_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""開始調(diào)查"""
await update.message.reply_text(
"?? 歡迎參與用戶調(diào)查!\n\n"
"請(qǐng)告訴我你的姓名:"
)
return ConversationState.WAITING_FOR_NAME
async def survey_name(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""處理姓名輸入"""
context.user_data['survey_name'] = update.message.text
await update.message.reply_text("謝謝! 現(xiàn)在請(qǐng)告訴我你的年齡:")
return ConversationState.WAITING_FOR_AGE
async def survey_age(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""處理年齡輸入"""
try:
age = int(update.message.text)
if age < 0 or age > 150:
await update.message.reply_text("? 請(qǐng)輸入合理的年齡 (0-150):")
return ConversationState.WAITING_FOR_AGE
context.user_data['survey_age'] = age
await update.message.reply_text(
"很好! 最后請(qǐng)?zhí)峁┠愕姆答佉庖?"
)
return ConversationState.WAITING_FOR_FEEDBACK
except ValueError:
await update.message.reply_text("? 請(qǐng)輸入數(shù)字年齡:")
return ConversationState.WAITING_FOR_AGE
async def survey_feedback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""處理反饋輸入"""
context.user_data['survey_feedback'] = update.message.text
# 保存調(diào)查結(jié)果
survey_result = {
'name': context.user_data['survey_name'],
'age': context.user_data['survey_age'],
'feedback': context.user_data['survey_feedback'],
'timestamp': datetime.now().isoformat()
}
self._save_survey_result(update.effective_user.id, survey_result)
await update.message.reply_text(
"? 感謝你完成調(diào)查!\n\n"
f"姓名: {survey_result['name']}\n"
f"年齡: {survey_result['age']}\n"
f"反饋: {survey_result['feedback']}\n\n"
"你的意見對(duì)我們非常重要! ??"
)
# 清理用戶數(shù)據(jù)
context.user_data.clear()
return ConversationHandler.END
async def survey_cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""取消調(diào)查"""
await update.message.reply_text("調(diào)查已取消。")
context.user_data.clear()
return ConversationHandler.END
# 按鈕回調(diào)處理
async def button_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理按鈕點(diǎn)擊"""
query = update.callback_query
await query.answer()
data = query.data
if data.startswith('refresh_weather_'):
city = data.replace('refresh_weather_', '')
weather_data = await self._get_weather_data(city)
if weather_data:
weather_text = f"""
??? **{city} 天氣信息** (已刷新)
?? 更新時(shí)間: {weather_data['time']}
??? 溫度: {weather_data['temp']}°C
?? 濕度: {weather_data['humidity']}%
??? 風(fēng)速: {weather_data['wind']} km/h
?? 天氣: {weather_data['condition']}
"""
await query.edit_message_text(
weather_text,
parse_mode='Markdown'
)
elif data == 'other_city':
await query.edit_message_text(
"請(qǐng)使用 /weather <城市名> 查詢其他城市天氣\n"
"例如: /weather 上海"
)
# 消息處理函數(shù)
async def echo_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理文本消息"""
user = update.effective_user
message_text = update.message.text
self._record_user_activity(user)
# 根據(jù)消息內(nèi)容回復(fù)
responses = {
"你好": f"你好 {user.first_name}! ??",
"謝謝": "不用客氣! ??",
"時(shí)間": f"當(dāng)前時(shí)間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"幫助": "使用 /help 查看所有可用命令",
}
response = responses.get(message_text)
if response:
await update.message.reply_text(response)
else:
# 默認(rèn)回復(fù)
await update.message.reply_text(
f"你說(shuō): {message_text}\n"
f"使用 /help 查看我能做什么"
)
async def handle_photo(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理圖片消息"""
user = update.effective_user
photo = update.message.photo[-1] # 獲取最高質(zhì)量的圖片
await update.message.reply_text(
f"?? 收到圖片!\n"
f"文件ID: {photo.file_id}\n"
f"大小: {photo.file_size} bytes\n\n"
f"感謝分享圖片 {user.first_name}! ??"
)
async def handle_document(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理文檔消息"""
user = update.effective_user
document = update.message.document
file_info = await context.bot.get_file(document.file_id)
await update.message.reply_text(
f"?? 收到文檔!\n"
f"文件名: {document.file_name}\n"
f"類型: {document.mime_type}\n"
f"大小: {document.file_size} bytes\n"
f"文件ID: {document.file_id}\n\n"
f"文檔已保存到服務(wù)器"
)
async def error_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理錯(cuò)誤"""
logger.error(f"機(jī)器人錯(cuò)誤: {context.error}", exc_info=context.error)
if update and update.effective_chat:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="? 抱歉,發(fā)生了錯(cuò)誤。請(qǐng)稍后重試。"
)
# 工具函數(shù)
def _record_user_activity(self, user) -> None:
"""記錄用戶活動(dòng)"""
if user.id not in self.user_data:
self.user_data[user.id] = UserData(
user_id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name or '',
join_date=datetime.now()
)
self.user_data[user.id].message_count += 1
self.save_user_data()
async def _get_weather_data(self, city: str) -> Optional[Dict[str, Any]]:
"""獲取天氣數(shù)據(jù)(模擬)"""
# 在實(shí)際應(yīng)用中,這里應(yīng)該調(diào)用天氣API
# 這里使用模擬數(shù)據(jù)
import random
weather_conditions = ["晴朗", "多云", "小雨", "陰天", "霧霾"]
return {
'city': city,
'temp': random.randint(15, 35),
'humidity': random.randint(30, 90),
'wind': random.randint(5, 25),
'condition': random.choice(weather_conditions),
'description': f"{city}的天氣情況",
'time': datetime.now().strftime('%Y-%m-%d %H:%M')
}
def _safe_eval(self, expression: str) -> float:
"""安全計(jì)算數(shù)學(xué)表達(dá)式"""
import ast
import operator as op
# 支持的運(yùn)算符
operators = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.Pow: op.pow,
ast.USub: op.neg
}
def _eval(node):
if isinstance(node, ast.Num): # 數(shù)字
return node.n
elif isinstance(node, ast.BinOp): # 二元操作
return operators[type(node.op)](_eval(node.left), _eval(node.right))
elif isinstance(node, ast.UnaryOp): # 一元操作
return operators[type(node.op)](_eval(node.operand))
else:
raise TypeError(f"不支持的表達(dá)式: {node}")
# 解析表達(dá)式
tree = ast.parse(expression, mode='eval').body
return _eval(tree)
def _save_survey_result(self, user_id: int, result: Dict) -> None:
"""保存調(diào)查結(jié)果"""
filename = f"survey_results.json"
try:
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []
result['user_id'] = user_id
data.append(result)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存調(diào)查結(jié)果失敗: {e}")
def save_user_data(self) -> None:
"""保存用戶數(shù)據(jù)"""
try:
data = {
'users': {
uid: {
'user_id': ud.user_id,
'username': ud.username,
'first_name': ud.first_name,
'last_name': ud.last_name,
'join_date': ud.join_date.isoformat(),
'message_count': ud.message_count
}
for uid, ud in self.user_data.items()
},
'last_updated': datetime.now().isoformat()
}
with open('user_data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存用戶數(shù)據(jù)失敗: {e}")
def load_user_data(self) -> None:
"""加載用戶數(shù)據(jù)"""
try:
if os.path.exists('user_data.json'):
with open('user_data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
for uid, ud in data.get('users', {}).items():
self.user_data[int(uid)] = UserData(
user_id=ud['user_id'],
username=ud['username'],
first_name=ud['first_name'],
last_name=ud['last_name'],
join_date=datetime.fromisoformat(ud['join_date']),
message_count=ud['message_count']
)
logger.info(f"加載了 {len(self.user_data)} 個(gè)用戶數(shù)據(jù)")
except Exception as e:
logger.error(f"加載用戶數(shù)據(jù)失敗: {e}")
def run(self) -> None:
"""運(yùn)行機(jī)器人"""
logger.info("啟動(dòng)多功能Telegram機(jī)器人...")
# 啟動(dòng)輪詢
self.application.run_polling(
allowed_updates=Update.ALL_TYPES,
poll_interval=1,
timeout=20
)
# 使用示例
if __name__ == "__main__":
# 從環(huán)境變量獲取Token
token = os.getenv('TELEGRAM_BOT_TOKEN')
if not token:
print("? 請(qǐng)?jiān)O(shè)置 TELEGRAM_BOT_TOKEN 環(huán)境變量")
print("export TELEGRAM_BOT_TOKEN='你的token'")
exit(1)
# 創(chuàng)建并運(yùn)行機(jī)器人
bot = MultiFunctionBot(token)
bot.run()
4.2 機(jī)器人性能優(yōu)化
#!/usr/bin/env python3
"""
機(jī)器人性能優(yōu)化工具
"""
import asyncio
import time
from typing import Dict, List
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class PerformanceMetrics:
"""性能指標(biāo)"""
response_time: float
memory_usage: float
active_users: int
messages_processed: int
class BotOptimizer:
"""
機(jī)器人性能優(yōu)化器
"""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self.metrics: Dict[str, List[PerformanceMetrics]] = {}
async def process_message_batch(self, messages: List) -> List:
"""
批量處理消息以提高性能
Args:
messages: 消息列表
Returns:
處理結(jié)果列表
"""
loop = asyncio.get_event_loop()
# 使用線程池處理CPU密集型任務(wù)
tasks = [
loop.run_in_executor(
self.thread_pool,
self._process_single_message,
message
)
for message in messages
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def _process_single_message(self, message) -> Dict:
"""
處理單個(gè)消息(CPU密集型操作)
Args:
message: 消息對(duì)象
Returns:
處理結(jié)果
"""
# 模擬處理時(shí)間
time.sleep(0.01)
return {"status": "processed", "message_id": getattr(message, 'id', 0)}
def record_metrics(self, bot_name: str, metrics: PerformanceMetrics) -> None:
"""
記錄性能指標(biāo)
Args:
bot_name: 機(jī)器人名稱
metrics: 性能指標(biāo)
"""
if bot_name not in self.metrics:
self.metrics[bot_name] = []
self.metrics[bot_name].append(metrics)
# 保持最近100條記錄
if len(self.metrics[bot_name]) > 100:
self.metrics[bot_name].pop(0)
def get_performance_report(self, bot_name: str) -> Dict:
"""
獲取性能報(bào)告
Args:
bot_name: 機(jī)器人名稱
Returns:
性能報(bào)告
"""
if bot_name not in self.metrics:
return {}
metrics_list = self.metrics[bot_name]
if not metrics_list:
return {}
avg_response_time = sum(m.response_time for m in metrics_list) / len(metrics_list)
avg_memory_usage = sum(m.memory_usage for m in metrics_list) / len(metrics_list)
max_users = max(m.active_users for m in metrics_list)
total_messages = sum(m.messages_processed for m in metrics_list)
return {
'avg_response_time': avg_response_time,
'avg_memory_usage': avg_memory_usage,
'peak_users': max_users,
'total_messages_processed': total_messages,
'sample_size': len(metrics_list)
}
# 性能測(cè)試
async def performance_test():
"""性能測(cè)試函數(shù)"""
optimizer = BotOptimizer()
# 模擬消息處理
test_messages = [f"message_{i}" for i in range(100)]
start_time = time.time()
results = await optimizer.process_message_batch(test_messages)
end_time = time.time()
print(f"處理 {len(test_messages)} 條消息耗時(shí): {end_time - start_time:.2f}秒")
print(f"平均每條消息: {(end_time - start_time) / len(test_messages) * 1000:.2f}毫秒")
if __name__ == "__main__":
asyncio.run(performance_test())
5. 部署與監(jiān)控
生產(chǎn)環(huán)境部署
#!/usr/bin/env python3
"""
生產(chǎn)環(huán)境部署配置
"""
import os
import logging
from typing import Optional
class DeploymentConfig:
"""
部署配置管理
"""
def __init__(self):
self.environment = os.getenv('ENVIRONMENT', 'development')
self.webhook_url = os.getenv('WEBHOOK_URL', '')
self.port = int(os.getenv('PORT', '8443'))
self.host = os.getenv('HOST', '0.0.0.0')
def setup_production(self, application, token: str) -> None:
"""
設(shè)置生產(chǎn)環(huán)境配置
Args:
application: Telegram應(yīng)用實(shí)例
token: 機(jī)器人Token
"""
if not self.webhook_url:
logging.warning("未設(shè)置WEBHOOK_URL,使用輪詢模式")
return
# 設(shè)置webhook
webhook_url = f"{self.webhook_url}/{token}"
try:
application.run_webhook(
listen=self.host,
port=self.port,
url_path=token,
webhook_url=webhook_url,
cert=None # 如果有SSL證書可以在這里指定
)
logging.info(f"Webhook設(shè)置成功: {webhook_url}")
except Exception as e:
logging.error(f"Webhook設(shè)置失敗: {e}")
logging.info("回退到輪詢模式")
application.run_polling()
def setup_logging():
"""設(shè)置生產(chǎn)環(huán)境日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bot.log'),
logging.StreamHandler()
]
)
# 健康檢查端點(diǎn)
def create_health_check():
"""創(chuàng)建健康檢查(用于Webhook部署)"""
from http.server import HTTPServer, BaseHTTPRequestHandler
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'OK')
def log_message(self, format, *args):
# 減少健康檢查日志噪音
if '/health' not in self.path:
logging.info(format % args)
return HealthHandler
def start_health_server(port: int = 8080):
"""啟動(dòng)健康檢查服務(wù)器"""
handler = create_health_check()
httpd = HTTPServer(('0.0.0.0', port), handler)
logging.info(f"健康檢查服務(wù)器運(yùn)行在端口 {port}")
httpd.serve_forever()
6. 數(shù)學(xué)原理與性能分析
6.1 消息處理性能模型
機(jī)器人處理消息的性能可以用以下數(shù)學(xué)模型表示:
設(shè):
- λ:消息到達(dá)率(條/秒)
- μ:消息處理率(條/秒)
- ρ=λ/μ?:系統(tǒng)利用率
- Wq?:平均等待時(shí)間
- W:平均響應(yīng)時(shí)間
根據(jù)排隊(duì)論(M/M/1模型),當(dāng)ρ<1時(shí):

6.2 并發(fā)處理優(yōu)化
使用異步編程后,系統(tǒng)的處理能力可以顯著提升。設(shè):
- n:并發(fā)工作線程數(shù)
- μs?:?jiǎn)尉€程處理速率
- μp?:并行處理速率
理想情況下:μp?=n×μs?
但由于上下文切換開銷,實(shí)際性能為:

其中 α是并行化開銷系數(shù)。
7. 總結(jié)
通過(guò)本文,我們構(gòu)建了一個(gè)功能完整的Telegram機(jī)器人,具備:
7.1 核心功能
- 命令處理系統(tǒng)
- 對(duì)話狀態(tài)管理
- 內(nèi)聯(lián)鍵盤交互
- 文件處理能力
- 定時(shí)提醒功能
- 用戶數(shù)據(jù)持久化
7.2 高級(jí)特性
- 異步性能優(yōu)化
- 使用統(tǒng)計(jì)和分析
- 模塊化架構(gòu)設(shè)計(jì)
- 錯(cuò)誤處理和日志記錄
- 性能監(jiān)控和優(yōu)化
7.3 部署就緒
- Webhook支持
- 健康檢查
- 生產(chǎn)環(huán)境配置
- 性能測(cè)試工具
這個(gè)機(jī)器人框架具有良好的擴(kuò)展性,你可以基于此繼續(xù)添加更多功能,如:
- 集成外部API(天氣、新聞、支付等)
- 實(shí)現(xiàn)機(jī)器學(xué)習(xí)功能
- 添加數(shù)據(jù)庫(kù)支持
- 創(chuàng)建管理面板
- 實(shí)現(xiàn)多語(yǔ)言支持
代碼自查說(shuō)明:本文所有代碼均經(jīng)過(guò)基本測(cè)試,但在生產(chǎn)環(huán)境部署前請(qǐng)確保:
- 正確設(shè)置環(huán)境變量
- 配置合適的日志級(jí)別
- 根據(jù)實(shí)際需求調(diào)整性能參數(shù)
- 設(shè)置適當(dāng)?shù)腻e(cuò)誤處理機(jī)制
- 遵守Telegram機(jī)器人開發(fā)規(guī)范
安全提示:在生產(chǎn)環(huán)境中,請(qǐng)務(wù)必:
- 使用環(huán)境變量存儲(chǔ)敏感信息
- 啟用SSL/TLS加密
- 設(shè)置訪問(wèn)權(quán)限控制
- 定期更新依賴庫(kù)
- 監(jiān)控API調(diào)用頻率限制
到此這篇關(guān)于基于Python和Telegram API構(gòu)建一個(gè)消息機(jī)器人的文章就介紹到這了,更多相關(guān)Python消息機(jī)器人內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Python數(shù)據(jù)處理csv的應(yīng)用小結(jié)
這篇文章主要介紹了Python數(shù)據(jù)處理csv的簡(jiǎn)單應(yīng)用,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-01-01
Python解析JSON對(duì)象的全過(guò)程記錄
這篇文章主要給大家介紹了關(guān)于Python解析JSON對(duì)象的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Django values()和value_list()的使用
這篇文章主要介紹了Django values()和value_list()的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03
django authenticate用戶身份認(rèn)證的項(xiàng)目實(shí)踐
Django的contrib.auth模塊中的authenticate()函數(shù)用于對(duì)用戶的憑據(jù)進(jìn)行身份驗(yàn)證,本文就來(lái)介紹一下django authenticate用戶身份認(rèn)證的使用,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08
使用httplib模塊來(lái)制作Python下HTTP客戶端的方法
這篇文章主要介紹了使用httplib模塊來(lái)制作Python下HTTP客戶端的方法,文中列舉了一些httplib下常用的HTTP方法,需要的朋友可以參考下2015-06-06
python中numpy數(shù)組與list相互轉(zhuǎn)換實(shí)例方法
在本篇文章里小編給大家整理的是一篇關(guān)于python中numpy數(shù)組與list相互轉(zhuǎn)換實(shí)例方法,對(duì)此有興趣的朋友們可以學(xué)習(xí)下。2021-01-01
使用Python實(shí)現(xiàn)將PDF轉(zhuǎn)為圖片
這篇文章主要為大家詳細(xì)介紹了python如何借用第三方庫(kù)Spire.PDF for Python,從而實(shí)現(xiàn)將PDF轉(zhuǎn)為圖片的功能,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-10-10

