Python實現從網絡攝像頭拉流的方法分享
摘要
本文介紹幾種從攝像頭拉流的方法。
1、直接使用OpenCV
直接使用opencv的cv2.VideoCapture直接讀取rtsp視頻流,但是這樣做的缺點是延遲嚴重、出現掉幀、花屏現象等,原因在于opencv自己有一個緩存,每次會順序從自己的緩存中讀取,而不是直接讀取最新幀。
代碼如下:
import cv2
import datetime
def time_str(fmt=None):
if fmt is None:
fmt = '%Y_%m_%d_%H_%M_%S'
return datetime.datetime.today().strftime(fmt)
user_name, user_pwd = "admin", "1234"
ca_ip="192.168.1.100"
channel=2
cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" \
% (user_name, user_pwd, ca_ip, channel))
if cap.isOpened():
print("Opened")
while cap.isOpened():
ret, frame = cap.read()
cv2.imwrite("opencv_"+time_str() + ".jpg", frame)

2、使用ffmpeg
FFmpeg是一套強大的視頻、音頻處理程序,也是很多視頻處理軟件的基礎 。但是FFmpeg的命令行使用起來有一定的學習成本。而ffmpeg-python就是解決FFmpeg學習成本的問題,讓開發(fā)者使用python就可以調用FFmpeg的功能,既減少了學習成本,也增加了代碼的可讀性。

github地址:https://github.com/kkroening/ffmpeg-python
2.1、安裝方法
2.1.1、安裝ffmpeg-python
ffmpeg-python可以通過典型的 pip 安裝獲取最新版本(注意:是ffmpeg-python,不要寫成了python-ffmpeg):
pip install ffmpeg-python
或者可以從本地克隆和安裝源:
git clone git@github.com:kkroening/ffmpeg-python.git pip install -e ./ffmpeg-python
2.1.2、安裝FFmpeg
使用該庫,需要自行安裝FFmpeg,如果電腦已經安裝了,可以忽略本步驟。這里推薦直接使用conda進行安裝,可以省下很多麻煩,其他的安裝方式自行百度。
conda install ffmpeg
2.2、代碼實現
使用ffmpeg讀取rtsp流并轉換成numpy array,并使用cv2.imwrite保存。
import ffmpeg
import numpy as np
import cv2
import datetime
def main(source):
args = {
"rtsp_transport": "tcp",
"fflags": "nobuffer",
"flags": "low_delay"
} # 添加參數
probe = ffmpeg.probe(source)
cap_info = next(x for x in probe['streams'] if x['codec_type'] == 'video')
print("fps: {}".format(cap_info['r_frame_rate']))
width = cap_info['width'] # 獲取視頻流的寬度
height = cap_info['height'] # 獲取視頻流的高度
up, down = str(cap_info['r_frame_rate']).split('/')
fps = eval(up) / eval(down)
print("fps: {}".format(fps)) # 讀取可能會出錯錯誤
process1 = (
ffmpeg
.input(source, **args)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.overwrite_output()
.run_async(pipe_stdout=True)
)
while True:
in_bytes = process1.stdout.read(width * height * 3) # 讀取圖片
if not in_bytes:
break
# 轉成ndarray
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR) # 轉成BGR
# cv2.imshow(time_str(), frame)
cv2.imwrite(time_str()+".jpg", frame)
# if cv2.waitKey(1) == ord('q'):
# break
process1.kill() # 關閉
def time_str(fmt=None):
if fmt is None:
fmt = '%Y_%m_%d_%H_%M_%S'
return datetime.datetime.today().strftime(fmt)
if __name__ == "__main__":
# rtsp流需要換成自己的
user_name, user_pwd = "admin", "1234"
ca_ip = "192.168.1.168"
channel = 2
alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" \
% (user_name, user_pwd, ca_ip, channel)
main(alhua_rtsp)
3、多線程的方式讀取圖片
采用多線程的方式,新開一個線程,利用變量、隊列等方式保存最新幀,使得每次都讀取最新幀,而不是opencv自己緩存中的順序幀,不會延遲,不會花屏了,代碼如下:
import cv2
import threading
import sys
import datetime
def time_str(fmt=None):
if fmt is None:
fmt = '%Y_%m_%d_%H_%M_%S'
return datetime.datetime.today().strftime(fmt)
class RTSCapture(cv2.VideoCapture):
_cur_frame = None
_reading = False
schemes = ["rtsp://","rtmp://"]
@staticmethod
def create(url, *schemes):
rtscap = RTSCapture(url)
rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True)
rtscap.schemes.extend(schemes)
if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
rtscap._reading = True
elif isinstance(url, int):
pass
return rtscap
def isStarted(self):
ok = self.isOpened()
if ok and self._reading:
ok = self.frame_receiver.is_alive()
return ok
def recv_frame(self):
while self._reading and self.isOpened():
ok, frame = self.read()
if not ok: break
self._cur_frame = frame
self._reading = False
def read2(self):
frame = self._cur_frame
self._cur_frame = None
return frame is not None, frame
def start_read(self):
self.frame_receiver.start()
self.read_latest_frame = self.read2 if self._reading else self.read
def stop_read(self):
self._reading = False
if self.frame_receiver.is_alive(): self.frame_receiver.join()
if __name__ == '__main__':
user_name, user_pwd = "admin", "1234"
ca_ip = "192.168.1.100"
channel = 2
alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" \
% (user_name, user_pwd, ca_ip, channel)
rtscap = RTSCapture.create(alhua_rtsp)
rtscap.start_read()
while rtscap.isStarted():
ok, frame = rtscap.read_latest_frame()
# if cv2.waitKey(100) & 0xFF == ord('q'):
# break
if not ok:
continue
# inhere
# cv2.imshow(time_str(), frame)
cv2.imwrite(time_str() + ".jpg", frame)
rtscap.stop_read()
rtscap.release()
cv2.destroyAllWindows()
運行結果:

4、多進程的方式拉流
使用Python3自帶的多進程模塊,創(chuàng)建一個隊列,進程A從通過rtsp協(xié)議從視頻流中讀取出每一幀,并放入隊列中,進程B從隊列中將圖片取出,處理后進行顯示。進程A如果發(fā)現隊列里有兩張圖片(證明進程B的讀取速度跟不上進程A),那么進程A主動將隊列里面的舊圖片刪掉,換上新圖片。通過多線程的方法:
代碼如下:
import cv2
import multiprocessing as mp
import time
import datetime
def time_str(fmt=None):
if fmt is None:
fmt = '%Y_%m_%d_%H_%M_%S'
return datetime.datetime.today().strftime(fmt)
def image_put(q, user, pwd, ip, channel=1):
cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" % (user, pwd, ip, channel))
if cap.isOpened():
print('HIKVISION')
else:
cap = cv2.VideoCapture("rtsp://%s:%s@%s/cam/realmonitor?channel=%d&subtype=0" % (user, pwd, ip, channel))
print('DaHua')
while True:
q.put(cap.read()[1])
q.get() if q.qsize() > 1 else time.sleep(0.01)
def image_get(q, window_name):
# cv2.namedWindow(window_name, flags=cv2.WINDOW_FREERATIO)
while True:
frame = q.get()
# cv2.imshow(window_name, frame)
# cv2.waitKey(1)
cv2.imwrite("opencv_"+time_str() + ".jpg", frame)
cv2.waitKey(1)
def run_single_camera():
user_name, user_pwd, camera_ip = "admin", "admin123456", "192.168.35.121"
mp.set_start_method(method='spawn') # init
queue = mp.Queue(maxsize=2)
processes = [mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)),
mp.Process(target=image_get, args=(queue, camera_ip))]
[process.start() for process in processes]
[process.join() for process in processes]
def run_multi_camera():
# user_name, user_pwd = "admin", "password"
user_name, user_pwd = "admin", "1234"
camera_ip_l = [
"192.168.1.XX3", # ipv4
"192.168.1.XX2",
"192.168.1.XX1",
]
mp.set_start_method(method='spawn') # init
queues = [mp.Queue(maxsize=90) for _ in camera_ip_l]
processes = []
for queue, camera_ip in zip(queues, camera_ip_l):
processes.append(mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)))
processes.append(mp.Process(target=image_get, args=(queue, camera_ip)))
for process in processes:
process.daemon = True
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
# run_single_camera()
run_multi_camera()
pass
到此這篇關于Python實現從網絡攝像頭拉流的方法分享的文章就介紹到這了,更多相關Python網絡攝像頭拉流內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Tensorflow2.10使用BERT從文本中抽取答案實現詳解
這篇文章主要為大家介紹了Tensorflow2.10使用BERT從文本中抽取答案實現詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04
Python編程中NotImplementedError的使用方法
下面小編就為大家分享一篇Python編程中NotImplementedError的使用方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04

