Python+OpenCV實現(xiàn)將舊手機變成監(jiān)控攝像頭
1. 引言
在科技快速發(fā)展的今天,我們手中往往堆積著不少被淘汰的舊手機。這些設(shè)備雖然無法跟上最新款手機的性能,但它們?nèi)匀痪邆渫旰玫臄z像頭、處理器和網(wǎng)絡(luò)連接功能。據(jù)統(tǒng)計,全球每年有超過1.5億部智能手機被閑置或丟棄,這不僅是資源的浪費,也對環(huán)境造成了壓力。
與此同時,家庭和辦公場所對安防監(jiān)控的需求日益增長。傳統(tǒng)的監(jiān)控攝像頭價格昂貴,安裝復(fù)雜,而且可能存在隱私泄露的風(fēng)險。利用舊手機搭建監(jiān)控系統(tǒng),不僅成本低廉,還能充分發(fā)揮閑置設(shè)備的剩余價值。
本文將詳細(xì)介紹如何使用Python和OpenCV將舊手機改造成功能完善的監(jiān)控攝像頭,實現(xiàn)實時監(jiān)控、運動檢測、人臉識別、自動錄像等高級功能。這個方案具有以下優(yōu)勢:
- 成本極低:利用閑置設(shè)備,無需額外硬件投資
- 靈活性強:可根據(jù)需求自定義各種監(jiān)控功能
- 隱私安全:數(shù)據(jù)存儲在本地,避免云端隱私泄露風(fēng)險
- 易于擴展:基于Python生態(tài)系統(tǒng),方便添加新功能
2. 系統(tǒng)架構(gòu)與工作原理
2.1 整體系統(tǒng)架構(gòu)
整個監(jiān)控系統(tǒng)由三個主要部分組成:手機端視頻流服務(wù)器、PC端處理程序、以及可選的云端通知服務(wù)。

2.2 視頻流傳輸原理
手機攝像頭視頻流通過IP攝像頭應(yīng)用轉(zhuǎn)換成RTSP或HTTP流,PC端使用OpenCV捕獲這些視頻流并進(jìn)行處理。整個過程基于客戶端-服務(wù)器架構(gòu):
- 手機端:運行IP攝像頭應(yīng)用,將攝像頭數(shù)據(jù)編碼為H.264/H.265格式
- 網(wǎng)絡(luò)傳輸:通過WiFi傳輸視頻流數(shù)據(jù)
- PC端:接收并解碼視頻流,應(yīng)用計算機視覺算法進(jìn)行分析
視頻流的傳輸可以使用以下公式表示:

其中每個幀的處理時間為:
tprocessing?=tcapture?+tprocess?+tdisplay?
3. 環(huán)境配置與依賴安裝
3.1 手機端配置
Android手機配置
1.安裝IP攝像頭應(yīng)用
- 推薦應(yīng)用:IP Webcam(免費,功能豐富)
- 替代方案:DroidCam、Alfred Camera
2.配置步驟:
- 下載并安裝IP Webcam應(yīng)用
- 打開應(yīng)用,向下滾動到"服務(wù)器"部分
- 點擊"啟動服務(wù)器"按鈕
- 記下顯示的IP地址和端口號(通常是
http://192.168.x.x:8080)
3.高級設(shè)置:
- 視頻質(zhì)量:建議設(shè)置為720p以平衡質(zhì)量與性能
- 幀率:15-30fps
- 音頻:根據(jù)需要開啟或關(guān)閉
iPhone手機配置
- 安裝應(yīng)用:使用iVCam或EpocCam
- 確保手機和電腦在同一WiFi網(wǎng)絡(luò)下
- 啟動應(yīng)用并記下連接信息
3.2 PC端環(huán)境配置
安裝Python環(huán)境
# 創(chuàng)建虛擬環(huán)境(推薦) python -m venv surveillance_env source surveillance_env/bin/activate # Linux/Mac # 或 surveillance_env\Scripts\activate # Windows # 安裝核心依賴 pip install opencv-python pip install numpy pip install pillow pip install requests pip install smtplib # 用于郵件通知 pip install twilio # 用于短信通知(可選)
驗證安裝
創(chuàng)建測試腳本驗證環(huán)境配置:
# test_environment.py
import cv2
import numpy as np
import sys
def test_environment():
"""測試環(huán)境配置是否正常"""
print("Python版本:", sys.version)
print("OpenCV版本:", cv2.__version__)
print("NumPy版本:", np.__version__)
# 測試OpenCV基本功能
try:
# 創(chuàng)建測試圖像
test_image = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8)
# 測試圖像處理
gray = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
print("? OpenCV圖像處理功能正常")
print("? NumPy數(shù)組操作正常")
print("環(huán)境測試通過!")
except Exception as e:
print(f"環(huán)境測試失敗: {e}")
if __name__ == "__main__":
test_environment()
4. 基礎(chǔ)視頻流捕獲
4.1 簡單的視頻流捕獲程序
讓我們從最基本的視頻流捕獲開始,這是一個驗證連接和基礎(chǔ)功能的關(guān)鍵步驟。
# basic_stream.py
import cv2
import numpy as np
import time
class BasicCameraStream:
"""
基礎(chǔ)攝像頭流捕獲類
"""
def __init__(self, stream_url):
"""
初始化攝像頭流
參數(shù):
stream_url (str): 視頻流URL
"""
self.stream_url = stream_url
self.cap = None
self.is_connected = False
def connect(self, timeout=30):
"""
連接到視頻流
參數(shù):
timeout (int): 連接超時時間(秒)
返回:
bool: 連接是否成功
"""
print(f"嘗試連接到: {self.stream_url}")
self.cap = cv2.VideoCapture(self.stream_url)
start_time = time.time()
while not self.is_connected and (time.time() - start_time) < timeout:
ret, frame = self.cap.read()
if ret and frame is not None:
self.is_connected = True
print("連接成功!")
break
time.sleep(0.1)
return self.is_connected
def read_frame(self):
"""
讀取一幀圖像
返回:
tuple: (success, frame)
"""
if not self.is_connected:
return False, None
ret, frame = self.cap.read()
return ret, frame
def display_stream(self, window_name="監(jiān)控畫面"):
"""
顯示實時視頻流
參數(shù):
window_name (str): 窗口名稱
"""
if not self.connect():
print("連接失敗,請檢查URL和網(wǎng)絡(luò)連接")
return
print("按 'q' 鍵退出顯示")
frame_count = 0
start_time = time.time()
while True:
ret, frame = self.read_frame()
if not ret:
print("讀取幀失敗")
break
# 計算并顯示FPS
frame_count += 1
elapsed_time = time.time() - start_time
if elapsed_time > 0:
fps = frame_count / elapsed_time
cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 顯示幀
cv2.imshow(window_name, frame)
# 按'q'退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
self.release()
cv2.destroyAllWindows()
def release(self):
"""釋放資源"""
if self.cap is not None:
self.cap.release()
self.is_connected = False
# 使用示例
if __name__ == "__main__":
# 常見的視頻流URL格式
stream_urls = [
# IP Webcam默認(rèn)URL
"http://192.168.1.100:8080/video",
# MJPEG流
"http://192.168.1.100:8080/videofeed",
# RTSP流(某些應(yīng)用使用)
"rtsp://192.168.1.100:8080/h264_ulaw.sdp"
]
# 替換為你的手機IP和端口
your_phone_ip = "192.168.1.100" # 修改為實際IP
stream_url = f"http://{your_phone_ip}:8080/video"
stream = BasicCameraStream(stream_url)
stream.display_stream()
4.2 支持多種流格式的增強版捕獲器
# enhanced_stream.py
import cv2
import time
import threading
from queue import Queue
import urllib.request
import urllib.error
class EnhancedCameraStream:
"""
增強版攝像頭流捕獲類,支持多種協(xié)議和自動重連
"""
def __init__(self, stream_url, buffer_size=128, timeout=10):
"""
初始化增強攝像頭流
參數(shù):
stream_url (str): 視頻流URL
buffer_size (int): 幀緩沖區(qū)大小
timeout (int): 連接超時時間(秒)
"""
self.stream_url = stream_url
self.buffer_size = buffer_size
self.timeout = timeout
self.frame_queue = Queue(maxsize=buffer_size)
self.running = False
self.thread = None
self.current_frame = None
self.frame_count = 0
self.last_frame_time = 0
self.fps = 0
def start(self):
"""開始捕獲視頻流"""
if self.running:
print("視頻流已經(jīng)在運行中")
return
self.running = True
self.thread = threading.Thread(target=self._capture_frames)
self.thread.daemon = True
self.thread.start()
print("視頻流捕獲已啟動")
def stop(self):
"""停止捕獲視頻流"""
self.running = False
if self.thread is not None:
self.thread.join(timeout=5)
print("視頻流捕獲已停止")
def _capture_frames(self):
"""在單獨線程中捕獲幀"""
cap = cv2.VideoCapture(self.stream_url)
# 設(shè)置緩沖大小以減少延遲
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
last_success_time = time.time()
while self.running:
ret, frame = cap.read()
if ret and frame is not None:
# 更新成功時間
last_success_time = time.time()
# 計算FPS
current_time = time.time()
if self.last_frame_time > 0:
self.fps = 1.0 / (current_time - self.last_frame_time)
self.last_frame_time = current_time
# 更新當(dāng)前幀
self.current_frame = frame.copy()
self.frame_count += 1
# 將幀放入隊列(如果隊列已滿,移除最舊的幀)
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(frame)
else:
# 檢查是否需要重連
if time.time() - last_success_time > self.timeout:
print("視頻流中斷,嘗試重新連接...")
cap.release()
time.sleep(2)
cap = cv2.VideoCapture(self.stream_url)
last_success_time = time.time()
time.sleep(0.001) # 小延遲以避免過度占用CPU
cap.release()
def read(self):
"""
讀取當(dāng)前幀
返回:
tuple: (success, frame)
"""
if self.current_frame is None:
return False, None
return True, self.current_frame.copy()
def get_frame_from_queue(self, timeout=1.0):
"""
從隊列獲取幀
參數(shù):
timeout (float): 超時時間
返回:
frame or None: 獲取到的幀
"""
try:
return self.frame_queue.get(timeout=timeout)
except:
return None
def is_connected(self):
"""檢查是否連接成功"""
return self.current_frame is not None and self.fps > 0
def get_status(self):
"""獲取流狀態(tài)"""
return {
'connected': self.is_connected(),
'fps': self.fps,
'frame_count': self.frame_count,
'queue_size': self.frame_queue.qsize()
}
def test_connection(ip_address, port=8080):
"""
測試與手機攝像頭的連接
參數(shù):
ip_address (str): 手機IP地址
port (int): 端口號
返回:
bool: 連接是否成功
"""
test_urls = [
f"http://{ip_address}:{port}/video",
f"http://{ip_address}:{port}/videofeed",
f"http://{ip_address}:{port}"
]
for url in test_urls:
try:
response = urllib.request.urlopen(url, timeout=5)
if response.getcode() == 200:
print(f"? 連接成功: {url}")
return url
except urllib.error.URLError:
continue
except Exception as e:
continue
print("? 所有連接嘗試都失敗")
return None
# 使用示例
if __name__ == "__main__":
# 測試連接
phone_ip = "192.168.1.100" # 替換為實際IP
working_url = test_connection(phone_ip)
if working_url:
stream = EnhancedCameraStream(working_url)
stream.start()
# 等待連接建立
time.sleep(3)
try:
while True:
success, frame = stream.read()
if success:
status = stream.get_status()
cv2.putText(frame, f"FPS: {status['fps']:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(frame, f"Frames: {status['frame_count']}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Enhanced Stream", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
stream.stop()
cv2.destroyAllWindows()
5. 運動檢測功能實現(xiàn)
運動檢測是監(jiān)控系統(tǒng)的核心功能,它能夠在畫面發(fā)生變化時自動觸發(fā)錄像或報警。
基于幀差分的運動檢測
# motion_detector.py
import cv2
import numpy as np
import time
from datetime import datetime
import os
class MotionDetector:
"""
運動檢測器類
"""
def __init__(self, min_area=500, threshold=25, blur_kernel=(5, 5)):
"""
初始化運動檢測器
參數(shù):
min_area (int): 最小運動區(qū)域面積(像素)
threshold (int): 二值化閾值
blur_kernel (tuple): 高斯模糊核大小
"""
self.min_area = min_area
self.threshold = threshold
self.blur_kernel = blur_kernel
# 狀態(tài)變量
self.previous_frame = None
self.motion_detected = False
self.motion_start_time = None
self.motion_counter = 0
def preprocess_frame(self, frame):
"""
預(yù)處理幀
參數(shù):
frame: 輸入幀
返回:
處理后的灰度幀
"""
# 轉(zhuǎn)換為灰度圖
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 應(yīng)用高斯模糊減少噪聲
blurred = cv2.GaussianBlur(gray, self.blur_kernel, 0)
return blurred
def detect_motion(self, current_frame):
"""
檢測運動
參數(shù):
current_frame: 當(dāng)前幀
返回:
tuple: (has_motion, contours, processed_frame)
"""
# 預(yù)處理當(dāng)前幀
processed_frame = self.preprocess_frame(current_frame)
# 如果沒有前一幀,初始化并返回?zé)o運動
if self.previous_frame is None:
self.previous_frame = processed_frame
return False, [], processed_frame
# 計算當(dāng)前幀與前一幀的絕對差
frame_delta = cv2.absdiff(self.previous_frame, processed_frame)
# 二值化差分圖像
thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)[1]
# 膨脹二值圖像以填充孔洞
thresh = cv2.dilate(thresh, None, iterations=2)
# 查找輪廓
contours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 過濾小輪廓
significant_contours = []
motion_detected = False
for contour in contours:
if cv2.contourArea(contour) > self.min_area:
significant_contours.append(contour)
motion_detected = True
# 更新前一幀
self.previous_frame = processed_frame
# 更新運動狀態(tài)
self.update_motion_status(motion_detected)
return motion_detected, significant_contours, processed_frame
def update_motion_status(self, current_motion):
"""更新運動狀態(tài)計數(shù)器"""
if current_motion:
self.motion_counter += 1
if self.motion_counter >= 3: # 連續(xù)3幀檢測到運動才確認(rèn)
self.motion_detected = True
if self.motion_start_time is None:
self.motion_start_time = time.time()
else:
self.motion_counter = max(0, self.motion_counter - 1)
if self.motion_counter == 0:
self.motion_detected = False
self.motion_start_time = None
def draw_motion_areas(self, frame, contours):
"""
在幀上繪制運動區(qū)域
參數(shù):
frame: 原始幀
contours: 運動輪廓列表
返回:
繪制了運動區(qū)域的幀
"""
output_frame = frame.copy()
for contour in contours:
# 計算邊界框
(x, y, w, h) = cv2.boundingRect(contour)
# 繪制邊界框
cv2.rectangle(output_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# 添加標(biāo)簽
cv2.putText(output_frame, "Motion", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return output_frame
def get_motion_duration(self):
"""獲取運動持續(xù)時間(秒)"""
if self.motion_start_time is not None:
return time.time() - self.motion_start_time
return 0
class MotionRecordingSystem:
"""
運動觸發(fā)的錄像系統(tǒng)
"""
def __init__(self, output_dir="recordings", pre_motion_buffer=30, post_motion_buffer=30):
"""
初始化錄像系統(tǒng)
參數(shù):
output_dir (str): 錄像保存目錄
pre_motion_buffer (int): 運動前緩沖幀數(shù)
post_motion_buffer (int): 運動后緩沖幀數(shù)
"""
self.output_dir = output_dir
self.pre_motion_buffer = pre_motion_buffer
self.post_motion_buffer = post_motion_buffer
# 創(chuàng)建輸出目錄
os.makedirs(output_dir, exist_ok=True)
# 狀態(tài)變量
self.is_recording = False
self.frame_buffer = []
self.video_writer = None
self.recording_start_time = None
def start_recording(self, frame, fps=20.0):
"""開始錄像"""
if self.is_recording:
return
# 生成文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.output_dir, f"motion_{timestamp}.avi")
# 獲取幀尺寸
height, width = frame.shape[:2]
# 初始化視頻寫入器
fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.video_writer = cv2.VideoWriter(filename, fourcc, fps, (width, height))
# 寫入緩沖幀
for buffered_frame in self.frame_buffer:
self.video_writer.write(buffered_frame)
self.is_recording = True
self.recording_start_time = time.time()
print(f"開始錄像: {filename}")
def stop_recording(self):
"""停止錄像"""
if self.is_recording and self.video_writer is not None:
self.video_writer.release()
self.video_writer = None
self.is_recording = False
duration = time.time() - self.recording_start_time
print(f"停止錄像,時長: {duration:.2f}秒")
def process_frame(self, frame, motion_detected):
"""
處理幀并管理錄像
參數(shù):
frame: 當(dāng)前幀
motion_detected (bool): 是否檢測到運動
"""
# 維護(hù)幀緩沖區(qū)
self.frame_buffer.append(frame.copy())
if len(self.frame_buffer) > self.pre_motion_buffer:
self.frame_buffer.pop(0)
# 錄像邏輯
if motion_detected:
if not self.is_recording:
self.start_recording(frame)
# 寫入當(dāng)前幀
if self.video_writer is not None:
self.video_writer.write(frame)
elif self.is_recording:
# 運動結(jié)束,檢查是否需要停止錄像
if len(self.frame_buffer) >= self.post_motion_buffer:
self.stop_recording()
# 運動檢測演示
def demo_motion_detection(stream_url):
"""
運動檢測演示函數(shù)
"""
# 初始化組件
stream = EnhancedCameraStream(stream_url)
detector = MotionDetector(min_area=1000)
recorder = MotionRecordingSystem()
stream.start()
print("運動檢測系統(tǒng)啟動")
print("按 'q' 退出,按 'r' 重置背景幀")
try:
while True:
success, frame = stream.read()
if not success:
time.sleep(0.1)
continue
# 檢測運動
motion_detected, contours, processed_frame = detector.detect_motion(frame)
# 繪制運動區(qū)域
if motion_detected:
frame = detector.draw_motion_areas(frame, contours)
# 顯示運動信息
duration = detector.get_motion_duration()
cv2.putText(frame, f"Motion: {duration:.1f}s", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# 處理錄像
recorder.process_frame(frame, motion_detected)
# 顯示狀態(tài)信息
status = "RECORDING" if recorder.is_recording else "Monitoring"
cv2.putText(frame, f"Status: {status}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
# 顯示幀
cv2.imshow("Motion Detection", frame)
# 鍵盤輸入處理
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
detector.previous_frame = None
print("背景幀已重置")
finally:
if recorder.is_recording:
recorder.stop_recording()
stream.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
# 使用示例
stream_url = "http://192.168.1.100:8080/video" # 替換為實際URL
demo_motion_detection(stream_url)
6. 高級功能:人臉識別與物體檢測
人臉識別集成
# face_detection.py
import cv2
import numpy as np
import os
import time
class FaceDetector:
"""
人臉檢測器類
"""
def __init__(self, model_path=None, confidence_threshold=0.5):
"""
初始化人臉檢測器
參數(shù):
model_path (str): 模型文件路徑
confidence_threshold (float): 置信度閾值
"""
self.confidence_threshold = confidence_threshold
# 加載人臉檢測模型
if model_path and os.path.exists(model_path):
self.net = cv2.dnn.readNetFromTensorflow(model_path)
self.model_loaded = True
else:
# 使用OpenCV內(nèi)置的Haar級聯(lián)分類器
self.face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
self.model_loaded = False
def detect_faces(self, frame):
"""
檢測人臉
參數(shù):
frame: 輸入幀
返回:
list: 人臉邊界框列表 [(x, y, w, h), ...]
"""
if self.model_loaded:
return self._detect_faces_dnn(frame)
else:
return self._detect_faces_haar(frame)
def _detect_faces_haar(self, frame):
"""使用Haar級聯(lián)檢測人臉"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 檢測人臉
faces = self.face_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
return faces
def _detect_faces_dnn(self, frame):
"""使用DNN模型檢測人臉"""
(h, w) = frame.shape[:2]
# 構(gòu)建blob
blob = cv2.dnn.blobFromImage(
cv2.resize(frame, (300, 300)), 1.0, (300, 300),
(104.0, 177.0, 123.0)
)
# 通過網(wǎng)絡(luò)前向傳播
self.net.setInput(blob)
detections = self.net.forward()
faces = []
# 遍歷檢測結(jié)果
for i in range(0, detections.shape[2]):
confidence = detections[0, 0, i, 2]
# 過濾弱檢測
if confidence > self.confidence_threshold:
# 計算邊界框坐標(biāo)
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
# 確保邊界框在圖像尺寸內(nèi)
startX = max(0, startX)
startY = max(0, startY)
endX = min(w, endX)
endY = min(h, endY)
faces.append((startX, startY, endX - startX, endY - startY))
return faces
def draw_faces(self, frame, faces):
"""
在幀上繪制人臉邊界框
參數(shù):
frame: 原始幀
faces: 人臉邊界框列表
返回:
繪制了人臉邊界框的幀
"""
output_frame = frame.copy()
for (x, y, w, h) in faces:
# 繪制邊界框
cv2.rectangle(output_frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
# 添加標(biāo)簽
cv2.putText(output_frame, "Face", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
return output_frame
class AdvancedSurveillanceSystem:
"""
高級監(jiān)控系統(tǒng):集成運動檢測和人臉識別
"""
def __init__(self, stream_url):
"""
初始化高級監(jiān)控系統(tǒng)
參數(shù):
stream_url (str): 視頻流URL
"""
self.stream_url = stream_url
# 初始化各個組件
self.stream = EnhancedCameraStream(stream_url)
self.motion_detector = MotionDetector(min_area=800)
self.face_detector = FaceDetector()
self.recorder = MotionRecordingSystem()
# 統(tǒng)計信息
self.stats = {
'total_frames': 0,
'motion_events': 0,
'face_detections': 0,
'start_time': time.time()
}
def start(self):
"""啟動監(jiān)控系統(tǒng)"""
self.stream.start()
print("高級監(jiān)控系統(tǒng)啟動")
try:
while True:
success, frame = self.stream.read()
if not success:
time.sleep(0.1)
continue
self.stats['total_frames'] += 1
# 運動檢測
motion_detected, motion_contours, _ = self.motion_detector.detect_motion(frame)
if motion_detected:
self.stats['motion_events'] += 1
frame = self.motion_detector.draw_motion_areas(frame, motion_contours)
# 人臉檢測(只在檢測到運動時進(jìn)行,以節(jié)省計算資源)
faces = []
if motion_detected:
faces = self.face_detector.detect_faces(frame)
if faces:
self.stats['face_detections'] += 1
frame = self.face_detector.draw_faces(frame, faces)
# 錄像管理
self.recorder.process_frame(frame, motion_detected)
# 顯示統(tǒng)計信息
frame = self._draw_statistics(frame)
# 顯示幀
cv2.imshow("Advanced Surveillance", frame)
# 鍵盤控制
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
self.motion_detector.previous_frame = None
print("背景幀已重置")
elif key == ord('s'):
self._save_snapshot(frame)
finally:
self.stop()
def _draw_statistics(self, frame):
"""在幀上繪制統(tǒng)計信息"""
# 計算運行時間
run_time = time.time() - self.stats['start_time']
fps = self.stats['total_frames'] / run_time if run_time > 0 else 0
# 繪制統(tǒng)計信息
stats_text = [
f"FPS: {fps:.1f}",
f"Motion Events: {self.stats['motion_events']}",
f"Face Detections: {self.stats['face_detections']}",
f"Run Time: {run_time:.0f}s"
]
for i, text in enumerate(stats_text):
y_position = 30 + i * 25
cv2.putText(frame, text, (10, y_position),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 錄像狀態(tài)
status = "RECORDING" if self.recorder.is_recording else "MONITORING"
color = (0, 0, 255) if self.recorder.is_recording else (0, 255, 0)
cv2.putText(frame, f"Status: {status}", (frame.shape[1] - 200, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
return frame
def _save_snapshot(self, frame):
"""保存快照"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.recorder.output_dir, f"snapshot_{timestamp}.jpg")
cv2.imwrite(filename, frame)
print(f"快照已保存: {filename}")
def stop(self):
"""停止監(jiān)控系統(tǒng)"""
if self.recorder.is_recording:
self.recorder.stop_recording()
self.stream.stop()
cv2.destroyAllWindows()
# 打印最終統(tǒng)計
run_time = time.time() - self.stats['start_time']
print(f"\n監(jiān)控系統(tǒng)運行統(tǒng)計:")
print(f"總運行時間: {run_time:.0f}秒")
print(f"處理幀數(shù): {self.stats['total_frames']}")
print(f"運動事件: {self.stats['motion_events']}")
print(f"人臉檢測: {self.stats['face_detections']}")
# 使用示例
if __name__ == "__main__":
stream_url = "http://192.168.1.100:8080/video" # 替換為實際URL
system = AdvancedSurveillanceSystem(stream_url)
system.start()
7. 完整監(jiān)控系統(tǒng)實現(xiàn)
下面是一個完整的監(jiān)控系統(tǒng)實現(xiàn),集成了所有功能并提供用戶友好的界面。
# complete_surveillance_system.py
import cv2
import numpy as np
import time
import threading
import os
import json
from datetime import datetime
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from email.mime.base import MimeBase
from email import encoders
class CompleteSurveillanceSystem:
"""
完整的監(jiān)控系統(tǒng)
"""
def __init__(self, config_file="config.json"):
"""
初始化完整監(jiān)控系統(tǒng)
參數(shù):
config_file (str): 配置文件路徑
"""
self.load_config(config_file)
self.initialize_components()
self.running = False
def load_config(self, config_file):
"""加載配置文件"""
default_config = {
"stream_url": "http://192.168.1.100:8080/video",
"output_dir": "surveillance_recordings",
"min_motion_area": 800,
"motion_threshold": 25,
"face_detection": True,
"recording": {
"pre_buffer": 30,
"post_buffer": 30,
"fps": 20
},
"notifications": {
"enabled": False,
"email": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your_email@gmail.com",
"password": "your_password",
"to_email": "recipient@gmail.com"
}
},
"display": {
"show_fps": True,
"show_stats": True,
"window_width": 800,
"window_height": 600
}
}
if os.path.exists(config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
print("配置文件加載成功")
else:
self.config = default_config
self.save_config(config_file)
print("使用默認(rèn)配置,請編輯 config.json 文件")
def save_config(self, config_file):
"""保存配置文件"""
with open(config_file, 'w') as f:
json.dump(self.config, f, indent=4)
def initialize_components(self):
"""初始化所有組件"""
# 創(chuàng)建輸出目錄
os.makedirs(self.config["output_dir"], exist_ok=True)
# 初始化視頻流
self.stream = EnhancedCameraStream(self.config["stream_url"])
# 初始化運動檢測器
self.motion_detector = MotionDetector(
min_area=self.config["min_motion_area"],
threshold=self.config["motion_threshold"]
)
# 初始化人臉檢測器
if self.config["face_detection"]:
self.face_detector = FaceDetector()
else:
self.face_detector = None
# 初始化錄像系統(tǒng)
self.recorder = MotionRecordingSystem(
output_dir=self.config["output_dir"],
pre_motion_buffer=self.config["recording"]["pre_buffer"],
post_motion_buffer=self.config["recording"]["post_buffer"]
)
# 統(tǒng)計信息
self.stats = {
'start_time': time.time(),
'total_frames': 0,
'motion_events': 0,
'face_detections': 0,
'recordings': 0
}
# 事件日志
self.event_log = []
def send_notification(self, event_type, details):
"""
發(fā)送通知
參數(shù):
event_type (str): 事件類型
details (str): 事件詳情
"""
if not self.config["notifications"]["enabled"]:
return
try:
email_config = self.config["notifications"]["email"]
# 創(chuàng)建郵件
msg = MimeMultipart()
msg['From'] = email_config["username"]
msg['To'] = email_config["to_email"]
msg['Subject'] = f"Surveillance Alert: {event_type}"
body = f"""
Surveillance System Alert
Event Type: {event_type}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Details: {details}
This is an automated message from your surveillance system.
"""
msg.attach(MimeText(body, 'plain'))
# 發(fā)送郵件
server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
server.starttls()
server.login(email_config["username"], email_config["password"])
server.send_message(msg)
server.quit()
print(f"通知已發(fā)送: {event_type}")
except Exception as e:
print(f"發(fā)送通知失敗: {e}")
def log_event(self, event_type, details):
"""
記錄事件
參數(shù):
event_type (str): 事件類型
details (str): 事件詳情
"""
event = {
'timestamp': datetime.now().isoformat(),
'type': event_type,
'details': details
}
self.event_log.append(event)
# 保存到文件
log_file = os.path.join(self.config["output_dir"], "events.json")
with open(log_file, 'w') as f:
json.dump(self.event_log, f, indent=2)
def process_frame(self, frame):
"""處理單幀"""
self.stats['total_frames'] += 1
# 運動檢測
motion_detected, motion_contours, _ = self.motion_detector.detect_motion(frame)
if motion_detected:
self.stats['motion_events'] += 1
frame = self.motion_detector.draw_motion_areas(frame, motion_contours)
# 記錄運動事件
if self.stats['motion_events'] % 10 == 1: # 避免過多記錄
self.log_event("motion", f"Motion detected with {len(motion_contours)} areas")
# 人臉檢測
faces = []
if self.face_detector and motion_detected:
faces = self.face_detector.detect_faces(frame)
if faces:
self.stats['face_detections'] += 1
frame = self.face_detector.draw_faces(frame, faces)
# 發(fā)送人臉檢測通知
self.send_notification("Face Detected",
f"{len(faces)} face(s) detected")
self.log_event("face_detection", f"{len(faces)} face(s) detected")
# 錄像管理
was_recording = self.recorder.is_recording
self.recorder.process_frame(frame, motion_detected)
if self.recorder.is_recording and not was_recording:
self.stats['recordings'] += 1
self.send_notification("Recording Started", "Motion-triggered recording started")
self.log_event("recording_start", "Motion-triggered recording")
return frame, motion_detected, faces
def draw_overlay(self, frame, motion_detected, faces):
"""在幀上繪制疊加信息"""
overlay = frame.copy()
# 顯示統(tǒng)計信息
if self.config["display"]["show_stats"]:
run_time = time.time() - self.stats['start_time']
fps = self.stats['total_frames'] / run_time if run_time > 0 else 0
stats = [
f"Time: {datetime.now().strftime('%H:%M:%S')}",
f"FPS: {fps:.1f}",
f"Motion: {self.stats['motion_events']}",
f"Faces: {self.stats['face_detections']}",
f"Recordings: {self.stats['recordings']}"
]
for i, text in enumerate(stats):
y_pos = 30 + i * 25
cv2.putText(overlay, text, (10, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# 顯示狀態(tài)指示器
status_color = (0, 0, 255) if motion_detected else (0, 255, 0)
status_text = "ALERT" if motion_detected else "NORMAL"
cv2.putText(overlay, status_text, (overlay.shape[1] - 120, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, status_color, 2)
# 錄像狀態(tài)
if self.recorder.is_recording:
cv2.putText(overlay, "REC", (overlay.shape[1] - 50, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
return overlay
def start(self):
"""啟動監(jiān)控系統(tǒng)"""
print("啟動完整監(jiān)控系統(tǒng)...")
print("控制命令:")
print(" q - 退出")
print(" r - 重置背景幀")
print(" s - 保存快照")
print(" p - 暫停/繼續(xù)")
print(" n - 切換通知開關(guān)")
self.stream.start()
self.running = True
paused = False
try:
while self.running:
if not paused:
success, frame = self.stream.read()
if success:
# 處理幀
processed_frame, motion_detected, faces = self.process_frame(frame)
# 添加疊加信息
display_frame = self.draw_overlay(processed_frame, motion_detected, faces)
# 調(diào)整顯示尺寸
if (self.config["display"]["window_width"] > 0 and
self.config["display"]["window_height"] > 0):
display_frame = cv2.resize(
display_frame,
(self.config["display"]["window_width"],
self.config["display"]["window_height"])
)
# 顯示幀
cv2.imshow("Complete Surveillance System", display_frame)
# 鍵盤輸入處理
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
self.motion_detector.previous_frame = None
print("背景幀已重置")
elif key == ord('s'):
self._save_snapshot(frame if success else None)
elif key == ord('p'):
paused = not paused
print("系統(tǒng)已暫停" if paused else "系統(tǒng)已繼續(xù)")
elif key == ord('n'):
self.config["notifications"]["enabled"] = not self.config["notifications"]["enabled"]
status = "啟用" if self.config["notifications"]["enabled"] else "禁用"
print(f"通知功能已{status}")
time.sleep(0.01) # 小延遲以減少CPU占用
except KeyboardInterrupt:
print("系統(tǒng)被用戶中斷")
finally:
self.stop()
def _save_snapshot(self, frame):
"""保存快照"""
if frame is not None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.config["output_dir"], f"snapshot_{timestamp}.jpg")
cv2.imwrite(filename, frame)
print(f"快照已保存: {filename}")
else:
print("無法保存快照:無有效幀")
def stop(self):
"""停止監(jiān)控系統(tǒng)"""
self.running = False
if self.recorder.is_recording:
self.recorder.stop_recording()
self.stream.stop()
cv2.destroyAllWindows()
# 生成最終報告
self.generate_report()
print("監(jiān)控系統(tǒng)已停止")
def generate_report(self):
"""生成運行報告"""
run_time = time.time() - self.stats['start_time']
report = {
'session_start': datetime.fromtimestamp(self.stats['start_time']).isoformat(),
'session_end': datetime.now().isoformat(),
'total_duration_seconds': run_time,
'total_frames_processed': self.stats['total_frames'],
'average_fps': self.stats['total_frames'] / run_time if run_time > 0 else 0,
'motion_events': self.stats['motion_events'],
'face_detections': self.stats['face_detections'],
'recordings_made': self.stats['recordings'],
'events_logged': len(self.event_log)
}
# 保存報告
report_file = os.path.join(self.config["output_dir"],
f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
print(f"運行報告已保存: {report_file}")
return report
# 主程序入口
if __name__ == "__main__":
# 創(chuàng)建默認(rèn)配置文件(如果不存在)
if not os.path.exists("config.json"):
system = CompleteSurveillanceSystem()
print("請編輯 config.json 文件配置您的監(jiān)控系統(tǒng),然后重新運行程序。")
else:
system = CompleteSurveillanceSystem()
system.start()
8. 性能優(yōu)化與故障排除
8.1 性能優(yōu)化技巧
調(diào)整視頻流參數(shù):
- 降低分辨率(720p通常足夠)
- 減少幀率(15-20fps)
- 使用MJPEG編碼而非H.264
優(yōu)化處理流程:
- 只在檢測到運動時進(jìn)行人臉識別
- 使用多線程處理不同的任務(wù)
- 合理設(shè)置檢測間隔
內(nèi)存管理:
- 及時釋放不再使用的資源
- 使用幀緩沖區(qū)限制內(nèi)存使用
- 定期清理臨時文件
8.2 常見問題與解決方案
問題1:視頻流連接失敗
- 檢查手機和電腦是否在同一網(wǎng)絡(luò)
- 驗證IP地址和端口號
- 檢查防火墻設(shè)置
問題2:高CPU使用率
- 降低處理分辨率
- 減少檢測頻率
- 使用硬件加速(如果可用)
問題3:誤報太多
- 調(diào)整運動檢測閾值
- 增加最小檢測區(qū)域
- 使用更復(fù)雜的背景減除算法
問題4:延遲過高
- 使用有線網(wǎng)絡(luò)連接
- 優(yōu)化視頻編碼設(shè)置
- 減少處理流水線的復(fù)雜度
9. 安全與隱私考慮
在使用監(jiān)控系統(tǒng)時,安全和隱私是至關(guān)重要的考慮因素:
網(wǎng)絡(luò)安全:
- 使用WPA2/WPA3加密的WiFi網(wǎng)絡(luò)
- 定期更改路由器密碼
- 考慮使用VPN進(jìn)行遠(yuǎn)程訪問
數(shù)據(jù)保護(hù):
- 錄像文件本地存儲,不上傳至云端
- 加密敏感錄像文件
- 定期清理舊錄像
隱私合規(guī):
- 只在私人財產(chǎn)范圍內(nèi)使用
- 告知家庭成員或訪客監(jiān)控的存在
- 遵守當(dāng)?shù)仉[私法律法規(guī)
10. 擴展功能與未來改進(jìn)
10.1 可能的擴展功能
- 遠(yuǎn)程訪問:通過Web界面遠(yuǎn)程查看監(jiān)控畫面
- 云存儲集成:將重要錄像備份到云存儲
- 智能分析:使用深度學(xué)習(xí)模型進(jìn)行更準(zhǔn)確的行為分析
- 多攝像頭支持:同時監(jiān)控多個位置的攝像頭
- 移動應(yīng)用:開發(fā)手機App接收實時通知
10.2 技術(shù)改進(jìn)方向
算法優(yōu)化:
- 使用YOLO等現(xiàn)代目標(biāo)檢測算法
- 實現(xiàn)行人重識別功能
- 添加異常行為檢測
系統(tǒng)架構(gòu):
- 微服務(wù)架構(gòu)便于擴展
- 容器化部署
- 分布式處理
11. 總結(jié)
本文詳細(xì)介紹了如何將舊手機改造成功能完整的監(jiān)控攝像頭系統(tǒng)。通過Python和OpenCV,我們實現(xiàn)了:
- 基礎(chǔ)視頻流捕獲:穩(wěn)定地從手機攝像頭獲取視頻流
- 運動檢測:智能識別畫面中的運動變化
- 人臉識別:檢測和標(biāo)記畫面中的人臉
- 自動錄像:運動觸發(fā)的高效錄像系統(tǒng)
- 通知系統(tǒng):及時的事件通知機制
- 完整系統(tǒng)集成:所有功能集于一體的監(jiān)控解決方案
這個方案不僅成本低廉,而且高度可定制,可以根據(jù)具體需求進(jìn)行調(diào)整和擴展。舊手機因此獲得了新的生命,成為了一個功能強大的安防設(shè)備。
隨著技術(shù)的不斷發(fā)展,我們還可以繼續(xù)為這個系統(tǒng)添加更多智能功能,使其更加智能、高效和易用。希望本文能夠為您提供一個良好的起點,開啟您的DIY智能監(jiān)控之旅。
注意:在實際部署監(jiān)控系統(tǒng)時,請務(wù)必遵守當(dāng)?shù)胤煞ㄒ?guī),尊重他人隱私,并確保系統(tǒng)的安全性。
以上就是Python+OpenCV實現(xiàn)將舊手機變成監(jiān)控攝像頭的詳細(xì)內(nèi)容,更多關(guān)于Python監(jiān)控的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python中帶時區(qū)的日期轉(zhuǎn)換工具類總結(jié)
這篇文章主要為大家詳細(xì)介紹了一些Python中帶時區(qū)的日期轉(zhuǎn)換工具類,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價值,感興趣的小伙伴可以跟隨小編一起了解一下2023-05-05
使用Python進(jìn)行二進(jìn)制文件讀寫的簡單方法(推薦)
下面小編就為大家?guī)硪黄褂肞ython進(jìn)行二進(jìn)制文件讀寫的簡單方法(推薦)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09
Python利用Matplotlib繪圖無法顯示中文字體的解決方案
在很長一段時間里用Python繪圖,matplotlib都不能很好的顯示中文,下面這篇文章主要給大家介紹了關(guān)于Python利用Matplotlib繪圖無法顯示中文字體的解決方案,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04

