""" Common function for camera based method """ import json import logging import os import time from datetime import datetime import threading import cv2 import numpy as np from settings.config import settings logger = logging.getLogger(__name__) class VideoCaptureThread: def __init__(self, output_dir, video_source=0, sync_device=None): super(VideoCaptureThread, self).__init__() self.video_source = video_source self.cap = cv2.VideoCapture(self.video_source, cv2.CAP_DSHOW) while not self.cap.isOpened(): pass # Wait for the capture to be initialized self.cap.set(cv2.CAP_PROP_FPS, 30.0) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280), self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) self.sync_device = sync_device now = datetime.now() date_time_str = now.strftime("%m-%d-%Y-%H-%M-%S") try: os.makedirs(output_dir) except FileExistsError: pass self.output_path = os.path.join(output_dir, f'video_recording_{date_time_str}.mp4') self.out = cv2.VideoWriter(self.output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30.0, (1280, 720)) self.videothread = threading.Thread(target=self.run) self.videothread.start() def run(self): logger.info("Camera starting") self.capture_video() def capture_video(self): ret, frame = self.cap.read() if not ret: logger.error("Error: Couldn't read frame. Exit.") return # synchronize after getting the first frame if self.sync_device is not None: self.sync_device.send_trigger(0xff) # 255 for video ready while self.cap.isOpened(): ret, frame = self.cap.read() # TODO: online analysis (500ms step, asychronize) self.out.write(frame) def close(self): logger.info("Camera ended") self.cap.release() self.out.release() self.videothread.join()