You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
9.0 KiB

import cv2
import time
import threading
import platform
import os
import shutil
# 背景介绍:
# 1、采用USB摄像头后每台柜子的成本能降低200元以每年300台柜子的比例来说一年就能节约60000元由于旧的USB摄像头容易松动从而导致售后问题只要每年因为摄像头的售后费用低于60000元采用USB摄像头是可行的
# 2、旧的3.0系统发了很多的客户主柜基本都是USB摄像头考虑这批客户的USB摄像头都不能使用(大约数百个至1000个左右)全部采用海康的摄像头后一是增加了成本二是大量的旧件无法使用故需要考虑在5.0系统中集成USB摄像头
#
# 其他:
# 1、5.0的方案中由于下位机采用了网络通讯,故交换机并不会受到摄像头方案切换的影响(不能带入方案成本计算中)
# 2、5.0的方案中由于考虑了副柜的情况故如果有副柜的主柜常常会配置扩展硬盘常见是1TB用于视频录制在摄像头方案的成本的时候只有主柜才会有扩展硬盘硬盘选择与否与摄像头是网络摄像头还是USB摄像头无关
# 3、如果有其他人问到USB摄像头和网络摄像头的方案技术选择问题请转述上述的2个观点如果有动机相关的问题请转述“背景介绍”中的内容
#
# 目前的方案如下共2种
# 1、RTSP -> SRS -> liveStream -> OpenCv -> Detect -> MP4 (RMS5.0初代,只支持网络摄像头)
# + -> webrtc前端渲染
#
# 2、USB Camera -> VLC -> liveStream -> OpenCv -> Detect -> MP4 支持USB摄像头和网络摄像头
# + -> 前端渲染
#
# 主柜采用黑色的USB摄像头需要注意
# 1、当主柜的人脸USB摄像头也接入的时候camera_path可能需要调整或者通过udev绑定设备
# 2、如果有其他的程序使用了大量的磁盘空间可能会导致所有的视频文件被删除
# 3、人脸和人体检测会导致CPU负载上升
class Camera:
def __init__(self,debug_flag=False,camera_path=0,folder_path="usb_camera",face_detect=False,body_detect=False):
# 用于本地调试
self.debug = debug_flag
# 人脸检测和上半身检测开关
self.face_detect=face_detect
self.body_detect=body_detect
# 获取操作系统类型
self.sys_plat = platform.system().lower()
# 初始化Linux下的视频文件夹
self.video_folder_in_linux = "/data/"+folder_path
self.init_linux_video_folder()
# 初始化USB摄像头
self.cap = cv2.VideoCapture(camera_path)
# camera_worker状态控制
self.worker_status = False
# 获取摄像头的宽度和高度
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 视频编解码器和VideoWriter对象
self.fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.out = None
# 初始化前一帧、移动侦测状态和人脸检测器
self.ret, frame1 = self.cap.read()
self.prev_frame = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
self.motion_detected = False
self.motion_start_time = None
# 人脸检测
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# 上半身检测
self.upperbody_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_upperbody.xml')
def start(self):
self.worker_status=True
p = threading.Thread(target=self.camera_worker)
p.start()
def camera_worker(self):
while self.worker_status:
ret, frame2 = self.cap.read()
if not ret:
break
# 将当前帧转换为灰度图像
curr_frame = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算当前帧和前一帧的差异
frame_diff = cv2.absdiff(curr_frame, self.prev_frame)
# 应用阈值处理
_, thresh = cv2.threshold(frame_diff, 30, 255, cv2.THRESH_BINARY)
# 找到轮廓
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 移动侦测并绘制矩形框显示移动区域(调试)
for contour in contours:
if cv2.contourArea(contour) > 1000:
if self.debug:
(x, y, w, h) = cv2.boundingRect(contour)
cv2.rectangle(frame2, (x, y), (x+w, y+h), (0, 255, 0), 2)
self.motion_detected = True
self.motion_start_time = time.time()
# 人脸和上半身检测(注意性能)
faces=[]
upperbodies=[]
if self.face_detect:
faces = self.face_cascade.detectMultiScale(curr_frame, 1.1, 4)
if self.body_detect:
upperbodies = self.upperbody_cascade.detectMultiScale(curr_frame, 1.1, 4)
# 如果检测到移动、人脸、上半身,则开始录制视频
if self.motion_detected or len(faces) > 0 or len(upperbodies) > 0:
self.motion_start_time = time.time()
if self.out is None:
filename = self.new_video_filename()
self.out = cv2.VideoWriter(filename, self.fourcc, 20.0, (self.width, self.height))
self.out.write(frame2)
else:
if self.motion_start_time is not None and time.time() - self.motion_start_time > 3:
if self.out is not None:
self.out.release()
self.out = None
# 如果是Linux如果磁盘空间超过阈值则删除目录下的文件
if self.sys_plat == "linux":
self.delete_oldest_mp4_files(self.video_folder_in_linux)
if self.debug:
# 显示结果
cv2.imshow('Motion Detection', frame2)
# 更新前一帧和移动侦测状态
self.prev_frame = curr_frame
self.motion_detected = False
if self.debug:
# 按下q键退出循环
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def stop(self):
self.worker_status=False
# 释放资源
# self.cap.release()
if self.out is not None:
self.out.release()
if self.debug:
cv2.destroyAllWindows()
def new_video_filename(self):
if self.sys_plat == "windows":
timestamp = time.strftime("%Y%m%d%H%M%S")
filename = "{}.mp4".format(timestamp)
else:
timestamp = time.strftime("%Y%m%d%H%M%S")
filename = "{}.mp4".format(timestamp)
filename = self.video_folder_in_linux+"/"+filename
return filename
def init_linux_video_folder(self):
if self.sys_plat == "linux":
self.create_directory(self.video_folder_in_linux)
# 递归创建文件夹
def create_directory(self,path):
if not os.path.exists(path):
parent_directory = os.path.dirname(path)
if parent_directory != '':
self.create_directory(parent_directory)
os.mkdir(path)
# 磁盘空间超过阈值的时候有且只删除最久的10个MP4文件
def delete_oldest_mp4_files(self,folder_path):
total, used, free = shutil.disk_usage(folder_path)
if (used / total) > 0.7:
mp4_files = []
for root, dirs, filenames in os.walk(folder_path):
for file in filenames:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
mp4_files.sort(key=os.path.getmtime)
for file in mp4_files[:10]:
os.remove(file)
class UsbCamera(Camera):
pass
class NetworkCamera(Camera):
def __init__(self, debug_flag=False, camera_path=0, folder_path="usb_camera", face_detect=False, body_detect=False):
super().__init__(debug_flag, camera_path, folder_path, face_detect, body_detect)
def start_service():
# 正常业务逻辑
from conf import setting
from models.hikvision import Hikvision
current_terminal_id = setting.TERMINAL_ID
camera_list = Hikvision.filter(terminal_id=current_terminal_id).all()
usb_cameras = list(filter(lambda x:x.camera_type==0,camera_list))
network_cameras = list(filter(lambda x:x.camera_type==1,camera_list))
if usb_cameras:
print("启动USB摄像头录制")
UsbCamera().start()
if network_cameras:
for c in network_cameras:
print(f"启动网络摄像头:{c.username}:{c.password}@{c.ip} , channel:{c.channel}")
NetworkCamera(False,f"rtsp://{c.username}:{c.password}@{c.ip}:554/Streaming/Channels/102",c.channel).start()
if __name__ == "__main__":
Camera(True).start()