import unittest import time import os import shutil from glob import glob import cv2 import numpy as np from device.video import VideoCaptureThread from device.trigger_box import TriggerNeuracle from device.data_client import NeuracleDataClient def get_video_length(file_path): cap = cv2.VideoCapture(file_path) if not cap.isOpened(): return None # Get the frames per second (fps) and total number of frames fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Calculate the duration of the video in seconds duration = total_frames / fps # Release the video capture object cap.release() return duration class TestVideo(unittest.TestCase): def test_video_recording(self): output_dir = './tests/data/video' video_cam = VideoCaptureThread(output_dir=output_dir, video_source=1) time.sleep(10) video_cam.close() # read video files file = glob(os.path.join(output_dir, '*.mp4'))[0] duration = get_video_length(file) self.assertTrue(isinstance(duration, float)) self.assertTrue(duration > 0) shutil.rmtree(output_dir) def test_video_sync(self): output_dir = './tests/data/video' trigger = TriggerNeuracle() data_client = NeuracleDataClient(buffer_len=10.) video_cam = VideoCaptureThread(output_dir=output_dir, video_source=1, sync_device=trigger) time.sleep(5) video_cam.close() events = data_client.get_trial_data(clear=True)[1] self.assertEqual(len(events), 1) self.assertAlmostEqual(events[0, 2], 255) data_client.close() shutil.rmtree(output_dir)