123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """ Common function for camera based method """
- from fractions import Fraction
- import json
- import logging
- import os
- import time
- import av
- import cv2
- import numpy as np
- from settings.config import settings
- logger = logging.getLogger(__name__)
- class VideoAnalyser(object):
- """摄像头/视频数据分析的基类, 实现逐帧分析
- Attributes:
- t_start_save_video (float): 开始保存视频的时间,当使用av保存视频时需要此参数计算pts
- out_stream: 使用opencv保存数据时使用
- container:使用av保存视频时使用
- stream: 使用av保存时使用
- """
- def __init__(self, camera_id=0, input_video=None):
- if not input_video:
- # For webcam input:
- self.camera_id = camera_id
- self.cap = cv2.VideoCapture(camera_id)
- # TODO: cv2.CAP_DSHOW 能加速摄像头开启,但会导致视频保存出错?
- # self.cap = cv2.VideoCapture(
- # camera_id) if camera_id == 0 else cv2.VideoCapture(
- # camera_id, cv2.CAP_DSHOW) # 调用外部摄像头需设置cv2.CAP_DSHOW
- self.is_camera = True
- else:
- self.cap = cv2.VideoCapture(input_video)
- self.is_camera = False
- # self.cap.setExceptionMode(True)
- # opencv 4.6 的自动旋转错误,采用自定义的旋转方式
- # self.cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 0.0)
- # self.rotate_code = self.check_rotation(
- # self.cap.get(cv2.CAP_PROP_ORIENTATION_META))
- self.rotate_code = None
- self.t_start_save_video = None
- self.save_with_av = False
- self.out_stream = None
- self.container = None
- self.stream = None
- self.previous_pts = 0
- def __del__(self):
- # self.cap.release()
- # logger.info('Camera(%s) closed.', self.__class__.__name__)
- # if self.out_stream:
- # self.out_stream.release()
- # if self.container and self.t_start_save_video:
- # self.release_container()
- self.close()
- def get_save_fps(self):
- return int(self.cap.get(cv2.CAP_PROP_FPS))
- def open_camera(self):
- success = self.cap.open(self.camera_id)
- if success:
- logger.info('Open camera(%s) succeed.', self.__class__.__name__)
- else:
- logger.error('Open camera(%s) failed.', self.__class__.__name__)
- # if camera_id == 0:
- # self.cap.open(camera_id)
- # else:
- # self.cap.open(camera_id, cv2.CAP_DSHOW)
- def close(self, only_save: bool = False):
- """关闭摄像头与结束视频保存
- 如果only_save为true,则结束视频保存,但不关闭摄像头;否则关闭摄像头与结束视频保存
- Args:
- only_save (bool, optional): 是否仅结束视频保存. Defaults to False.
- """
- if not only_save:
- self.cap.release()
- logger.info('Camera(%s) closed.', self.__class__.__name__)
- if self.out_stream:
- self.out_stream.release()
- self.out_stream = None
- self.release_container()
- self.container = None
- def set_output_video(self, output_video, save_with_av=False):
- """ 设置输出视频
- 使用摄像头的情况下,必须在开摄像头之后调用,否则参数获取失败,无法正确设置输出视频
- Args:
- output_video (string): 要保存的视频文件路径
- save_with_av (bool, optional): 使用av库进行保存
- """
- self.save_with_av = save_with_av
- if not self.save_with_av:
- # video info
- # fourcc = int(self.cap.get(cv2.CAP_PROP_FOURCC))
- # NOTICE: 这里需用 avc1 否则前端无法正常显示
- fourcc = cv2.VideoWriter_fourcc(*'avc1')
- fps = self.get_save_fps()
- frame_size = (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
- int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
- # file to save video
- self.out_stream = cv2.VideoWriter(output_video, fourcc, fps,
- frame_size)
- else:
- assert self.is_camera,\
- 'Do not save video with av when process recorded video!'
- self.container = av.open(output_video, mode='w')
- # NOTICE: 这里需使用 h264, 否则前端无法正常显示
- self.stream = self.container.add_stream(
- 'h264', rate=int(self.cap.get(cv2.CAP_PROP_FPS))) # alibi frame rate
- self.stream.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- self.stream.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- self.stream.pix_fmt = 'yuv420p'
- self.stream.codec_context.time_base = Fraction(
- 1, int(self.cap.get(cv2.CAP_PROP_FPS)))
- def is_ok(self):
- if self.cap and self.cap.isOpened():
- return True
- else:
- logger.debug('Camera not ready!!!')
- return False
- def check_rotation(self, rotate):
- rotate_code = None
- if int(rotate) == 270:
- rotate_code = cv2.ROTATE_90_CLOCKWISE
- elif int(rotate) == 180:
- rotate_code = cv2.ROTATE_180
- elif int(rotate) == 90:
- rotate_code = cv2.ROTATE_90_COUNTERCLOCKWISE
- return rotate_code
- def correct_rotation(self, frame, rotate_code):
- return cv2.rotate(frame, rotate_code)
- def process(self, save=True):
- try:
- success, image = self.cap.read()
- if not success:
- logger.debug('Ignoring empty camera frame.')
- if self.rotate_code is not None:
- image = self.correct_rotation(image, self.rotate_code)
- except cv2.error as exc:
- logger.error(
- 'read data from camera(%s) failed, it may be disconnected: %s',
- self.__class__.__name__, exc)
- raise exc
- t_read = time.time()
- if success and save:
- self.save_video(image, t_read)
- return success, image
- def save_video(self, image, t_read):
- if self.save_with_av:
- self.save_video_with_av(image, t_read)
- else:
- self.save_video_with_opencv(image)
- def save_video_with_opencv(self, image):
- if not self.out_stream:
- return
- try:
- assert self.out_stream.isOpened(), 'Cannot open video for writing'
- self.out_stream.write(image)
- except Exception as exc:
- logger.error('Fail to save video %s: %s', self.out_stream, exc)
- def save_video_with_av(self, image, t_start):
- """Save video with [av](https://github.com/PyAV-Org/PyAV)
- Args:
- image (np.ndarray): frame to save
- t_start (float): timestamp of this frame
- """
- if not self.container:
- return
- try:
- if not self.t_start_save_video:
- self.t_start_save_video = t_start
- frame = av.VideoFrame.from_ndarray(image, format='bgr24')
- # Presentation Time Stamp (seconds -> counts of time_base)
- delta_t = t_start - self.t_start_save_video
- if delta_t < 0.0:
- return
- pts = int(round(delta_t / self.stream.codec_context.time_base))
- logger.debug('pts: %d', pts)
- if pts > self.previous_pts:
- frame.pts = pts
- self.previous_pts = frame.pts
- for packet in self.stream.encode(frame):
- self.container.mux(packet)
- except ValueError as exc:
- logger.debug('Fail to save frame of video %s: %s', self.container, exc)
- def release_container(self):
- if self.t_start_save_video:
- self.av_finish_with_a_blank_frame()
- # Close the file
- if self.container:
- self.container.close()
- self.t_start_save_video = None
- self.previous_pts = 0
- def av_finish_with_a_blank_frame(self):
- # finish it with a blank frame, so the "last" frame actually gets
- # shown for some time this black frame will probably be shown for
- # 1/fps time at least, that is the analysis of ffprobe
- try:
- image = np.zeros((self.stream.height, self.stream.width, 3),
- dtype=np.uint8)
- frame = av.VideoFrame.from_ndarray(image, format='bgr24')
- pts = int(
- round((time.time() - self.t_start_save_video) /
- self.stream.codec_context.time_base))
- logger.debug('last pts: %d', pts)
- frame.pts = pts if pts > self.previous_pts else self.previous_pts + 1
- for packet in self.stream.encode(frame):
- self.container.mux(packet)
- # Flush stream
- for packet in self.stream.encode():
- self.container.mux(packet)
- except ValueError as exc:
- logger.debug('Fail to save frame of video %s: %s', self.container, exc)
- def generator(self):
- while self.is_ok():
- success, frame = self.process()
- # 使用generator函数输出视频流, 每次请求输出的content类型是image/jpeg
- if success:
- # 因为opencv读取的图片并非jpeg格式,因此要用motion JPEG模式需要先将图片转码成jpg格式图片
- ret, jpeg = cv2.imencode('.jpg', frame)
- # t_end = time.time()
- # logger.debug("Time for process: %fs", t_end - t_start)
- yield (b'--frame\r\n'
- b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() +
- b'\r\n\r\n')
- def create_data_dir(subject_id, train_id):
- """为保存视频数据创建文件夹
- Args:
- subject_id (_type_): _description_
- train_id (_type_): _description_
- """
- path = f'{settings.DATA_PATH}/{subject_id}/{train_id}'
- try:
- os.makedirs(path)
- except OSError:
- logger.debug('Folder already exists!')
- return path
- def json_generator(feeder):
- while feeder.is_ok():
- # time.sleep(1 / 30.0)
- success, _, data = feeder.process(only_keypoint=False)
- if success:
- json_data = json.dumps(data)
- yield f'data:{json_data}\n\n'
|