""" Common function for camera based method """ from fractions import Fraction import json import logging import os import time import av import cv2 import numpy as np from settings.config import settings logger = logging.getLogger(__name__) class VideoAnalyser(object): """摄像头/视频数据分析的基类, 实现逐帧分析 Attributes: t_start_save_video (float): 开始保存视频的时间,当使用av保存视频时需要此参数计算pts out_stream: 使用opencv保存数据时使用 container:使用av保存视频时使用 stream: 使用av保存时使用 """ def __init__(self, camera_id=0, input_video=None): if not input_video: # For webcam input: self.camera_id = camera_id self.cap = cv2.VideoCapture(camera_id) # TODO: cv2.CAP_DSHOW 能加速摄像头开启,但会导致视频保存出错? # self.cap = cv2.VideoCapture( # camera_id) if camera_id == 0 else cv2.VideoCapture( # camera_id, cv2.CAP_DSHOW) # 调用外部摄像头需设置cv2.CAP_DSHOW self.is_camera = True else: self.cap = cv2.VideoCapture(input_video) self.is_camera = False # self.cap.setExceptionMode(True) # opencv 4.6 的自动旋转错误,采用自定义的旋转方式 # self.cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 0.0) # self.rotate_code = self.check_rotation( # self.cap.get(cv2.CAP_PROP_ORIENTATION_META)) self.rotate_code = None self.t_start_save_video = None self.save_with_av = False self.out_stream = None self.container = None self.stream = None self.previous_pts = 0 def __del__(self): # self.cap.release() # logger.info('Camera(%s) closed.', self.__class__.__name__) # if self.out_stream: # self.out_stream.release() # if self.container and self.t_start_save_video: # self.release_container() self.close() def get_save_fps(self): return int(self.cap.get(cv2.CAP_PROP_FPS)) def open_camera(self): success = self.cap.open(self.camera_id) if success: logger.info('Open camera(%s) succeed.', self.__class__.__name__) else: logger.error('Open camera(%s) failed.', self.__class__.__name__) # if camera_id == 0: # self.cap.open(camera_id) # else: # self.cap.open(camera_id, cv2.CAP_DSHOW) def close(self, only_save: bool = False): """关闭摄像头与结束视频保存 如果only_save为true,则结束视频保存,但不关闭摄像头;否则关闭摄像头与结束视频保存 Args: only_save (bool, optional): 是否仅结束视频保存. Defaults to False. """ if not only_save: self.cap.release() logger.info('Camera(%s) closed.', self.__class__.__name__) if self.out_stream: self.out_stream.release() self.out_stream = None self.release_container() self.container = None def set_output_video(self, output_video, save_with_av=False): """ 设置输出视频 使用摄像头的情况下,必须在开摄像头之后调用,否则参数获取失败,无法正确设置输出视频 Args: output_video (string): 要保存的视频文件路径 save_with_av (bool, optional): 使用av库进行保存 """ self.save_with_av = save_with_av if not self.save_with_av: # video info # fourcc = int(self.cap.get(cv2.CAP_PROP_FOURCC)) # NOTICE: 这里需用 avc1 否则前端无法正常显示 fourcc = cv2.VideoWriter_fourcc(*'avc1') fps = self.get_save_fps() frame_size = (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) # file to save video self.out_stream = cv2.VideoWriter(output_video, fourcc, fps, frame_size) else: assert self.is_camera,\ 'Do not save video with av when process recorded video!' self.container = av.open(output_video, mode='w') # NOTICE: 这里需使用 h264, 否则前端无法正常显示 self.stream = self.container.add_stream( 'h264', rate=int(self.cap.get(cv2.CAP_PROP_FPS))) # alibi frame rate self.stream.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.stream.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.stream.pix_fmt = 'yuv420p' self.stream.codec_context.time_base = Fraction( 1, int(self.cap.get(cv2.CAP_PROP_FPS))) def is_ok(self): if self.cap and self.cap.isOpened(): return True else: logger.debug('Camera not ready!!!') return False def check_rotation(self, rotate): rotate_code = None if int(rotate) == 270: rotate_code = cv2.ROTATE_90_CLOCKWISE elif int(rotate) == 180: rotate_code = cv2.ROTATE_180 elif int(rotate) == 90: rotate_code = cv2.ROTATE_90_COUNTERCLOCKWISE return rotate_code def correct_rotation(self, frame, rotate_code): return cv2.rotate(frame, rotate_code) def process(self, save=True): try: success, image = self.cap.read() if not success: logger.debug('Ignoring empty camera frame.') if self.rotate_code is not None: image = self.correct_rotation(image, self.rotate_code) except cv2.error as exc: logger.error( 'read data from camera(%s) failed, it may be disconnected: %s', self.__class__.__name__, exc) raise exc t_read = time.time() if success and save: self.save_video(image, t_read) return success, image def save_video(self, image, t_read): if self.save_with_av: self.save_video_with_av(image, t_read) else: self.save_video_with_opencv(image) def save_video_with_opencv(self, image): if not self.out_stream: return try: assert self.out_stream.isOpened(), 'Cannot open video for writing' self.out_stream.write(image) except Exception as exc: logger.error('Fail to save video %s: %s', self.out_stream, exc) def save_video_with_av(self, image, t_start): """Save video with [av](https://github.com/PyAV-Org/PyAV) Args: image (np.ndarray): frame to save t_start (float): timestamp of this frame """ if not self.container: return try: if not self.t_start_save_video: self.t_start_save_video = t_start frame = av.VideoFrame.from_ndarray(image, format='bgr24') # Presentation Time Stamp (seconds -> counts of time_base) delta_t = t_start - self.t_start_save_video if delta_t < 0.0: return pts = int(round(delta_t / self.stream.codec_context.time_base)) logger.debug('pts: %d', pts) if pts > self.previous_pts: frame.pts = pts self.previous_pts = frame.pts for packet in self.stream.encode(frame): self.container.mux(packet) except ValueError as exc: logger.debug('Fail to save frame of video %s: %s', self.container, exc) def release_container(self): if self.t_start_save_video: self.av_finish_with_a_blank_frame() # Close the file if self.container: self.container.close() self.t_start_save_video = None self.previous_pts = 0 def av_finish_with_a_blank_frame(self): # finish it with a blank frame, so the "last" frame actually gets # shown for some time this black frame will probably be shown for # 1/fps time at least, that is the analysis of ffprobe try: image = np.zeros((self.stream.height, self.stream.width, 3), dtype=np.uint8) frame = av.VideoFrame.from_ndarray(image, format='bgr24') pts = int( round((time.time() - self.t_start_save_video) / self.stream.codec_context.time_base)) logger.debug('last pts: %d', pts) frame.pts = pts if pts > self.previous_pts else self.previous_pts + 1 for packet in self.stream.encode(frame): self.container.mux(packet) # Flush stream for packet in self.stream.encode(): self.container.mux(packet) except ValueError as exc: logger.debug('Fail to save frame of video %s: %s', self.container, exc) def generator(self): while self.is_ok(): success, frame = self.process() # 使用generator函数输出视频流, 每次请求输出的content类型是image/jpeg if success: # 因为opencv读取的图片并非jpeg格式,因此要用motion JPEG模式需要先将图片转码成jpg格式图片 ret, jpeg = cv2.imencode('.jpg', frame) # t_end = time.time() # logger.debug("Time for process: %fs", t_end - t_start) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n\r\n') def create_data_dir(subject_id, train_id): """为保存视频数据创建文件夹 Args: subject_id (_type_): _description_ train_id (_type_): _description_ """ path = f'{settings.DATA_PATH}/{subject_id}/{train_id}' try: os.makedirs(path) except OSError: logger.debug('Folder already exists!') return path def json_generator(feeder): while feeder.is_ok(): # time.sleep(1 / 30.0) success, _, data = feeder.process(only_keypoint=False) if success: json_data = json.dumps(data) yield f'data:{json_data}\n\n'