12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- """ Common function for camera based method """
- import json
- import logging
- import os
- import time
- from datetime import datetime
- import threading
- import cv2
- import numpy as np
- from settings.config import settings
- logger = logging.getLogger(__name__)
- class VideoCaptureThread:
- def __init__(self, output_dir, video_source=0, sync_device=None):
- super(VideoCaptureThread, self).__init__()
- self.video_source = video_source
- self.cap = cv2.VideoCapture(self.video_source, cv2.CAP_DSHOW)
- while not self.cap.isOpened():
- pass # Wait for the capture to be initialized
- self.cap.set(cv2.CAP_PROP_FPS, 30.0)
- self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280),
- self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
-
- self.sync_device = sync_device
-
- now = datetime.now()
- date_time_str = now.strftime("%m-%d-%Y-%H-%M-%S")
- try:
- os.makedirs(output_dir)
- except FileExistsError:
- pass
- self.output_path = os.path.join(output_dir, f'video_recording_{date_time_str}.mp4')
-
- self.out = cv2.VideoWriter(self.output_path, cv2.VideoWriter_fourcc(*'mp4v'), 30.0, (1280, 720))
- self.videothread = threading.Thread(target=self.run)
- self.videothread.start()
- def run(self):
- logger.info("Camera starting")
- self.capture_video()
-
- def capture_video(self):
- ret, frame = self.cap.read()
- if not ret:
- logger.error("Error: Couldn't read frame. Exit.")
- return
- # synchronize after getting the first frame
- if self.sync_device is not None:
- self.sync_device.send_trigger(0xff) # 255 for video ready
- while self.cap.isOpened():
- ret, frame = self.cap.read()
- # TODO: online analysis (500ms step, asychronize)
- self.out.write(frame)
- def close(self):
- logger.info("Camera ended")
- self.cap.release()
- self.out.release()
- self.videothread.join()
|