test_video.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import unittest
  2. import time
  3. import os
  4. import shutil
  5. from glob import glob
  6. import cv2
  7. import numpy as np
  8. from device.video import VideoCaptureThread
  9. from device.trigger_box import TriggerNeuracle
  10. from device.data_client import NeuracleDataClient
  11. trigger_port = 'COM6'
  12. def get_video_length(file_path):
  13. cap = cv2.VideoCapture(file_path)
  14. if not cap.isOpened():
  15. return None
  16. # Get the frames per second (fps) and total number of frames
  17. fps = cap.get(cv2.CAP_PROP_FPS)
  18. total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  19. # Calculate the duration of the video in seconds
  20. duration = total_frames / fps
  21. # Release the video capture object
  22. cap.release()
  23. return duration
  24. class TestVideo(unittest.TestCase):
  25. def test_video_recording(self):
  26. output_dir = './tests/data/video'
  27. video_cam = VideoCaptureThread(output_dir=output_dir, video_source=1)
  28. time.sleep(10)
  29. video_cam.close()
  30. # read video files
  31. file = glob(os.path.join(output_dir, '*.mp4'))[0]
  32. duration = get_video_length(file)
  33. self.assertTrue(isinstance(duration, float))
  34. self.assertTrue(duration > 0)
  35. shutil.rmtree(output_dir)
  36. def test_video_sync(self):
  37. output_dir = './tests/data/video'
  38. trigger = TriggerNeuracle(port=trigger_port)
  39. data_client = NeuracleDataClient(buffer_len=10.)
  40. video_cam = VideoCaptureThread(output_dir=output_dir, video_source=1, sync_device=trigger)
  41. time.sleep(5)
  42. video_cam.close()
  43. events = data_client.get_trial_data(clear=True)[1]
  44. self.assertEqual(len(events), 1)
  45. self.assertAlmostEqual(events[0, 2], 255)
  46. data_client.close()
  47. shutil.rmtree(output_dir)