test_utils.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """Test for video analyser """
  2. import os
  3. import time
  4. from unittest.mock import patch
  5. from unittest.mock import MagicMock
  6. import cv2
  7. import numpy as np
  8. from core.utils import VideoAnalyser
  9. TEST_DATA_PATH = 'tests/data/'
  10. INPUT_VIDEO_PATH = os.path.join(TEST_DATA_PATH, 'normal_side.mp4')
  11. OUTPUT_VIDEO_PATH = os.path.join(TEST_DATA_PATH, 'test_base.mp4')
  12. def setup_module():
  13. if not os.path.exists(TEST_DATA_PATH):
  14. os.makedirs(TEST_DATA_PATH)
  15. def teardown_function():
  16. if os.path.exists(OUTPUT_VIDEO_PATH):
  17. os.remove(OUTPUT_VIDEO_PATH)
  18. def gen_fake_image():
  19. return np.zeros((640, 320, 3), dtype=np.uint8)
  20. class TestVideoAnalyser:
  21. def test_init_without_input_video_is_camera(self):
  22. analyser = VideoAnalyser(input_video=None)
  23. assert analyser.is_camera
  24. def test_init_with_input_video_is_not_camera(self):
  25. mock_video_capture = MagicMock()
  26. mock_video_capture.release = MagicMock()
  27. with patch('cv2.VideoCapture', mock_video_capture):
  28. analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
  29. assert not analyser.is_camera
  30. def test_close_with_opencv_release_resource(self):
  31. analyser = VideoAnalyser()
  32. analyser.set_output_video(output_video='output.mp4', save_with_av=False)
  33. analyser.close()
  34. assert not analyser.cap.isOpened()
  35. assert not analyser.out_stream
  36. def test_close_with_av_release_resource(self):
  37. mock_release_container = MagicMock()
  38. analyser = VideoAnalyser()
  39. analyser.set_output_video(output_video='output.mp4', save_with_av=True)
  40. analyser.release_container = mock_release_container
  41. analyser.close()
  42. assert not analyser.cap.isOpened()
  43. assert not analyser.container
  44. assert mock_release_container.called
  45. def test_set_output_video_with_camera_and_opencv_success(self):
  46. analyser = VideoAnalyser(camera_id=0)
  47. analyser.open_camera()
  48. analyser.set_output_video(output_video='output.mp4', save_with_av=False)
  49. assert analyser.out_stream
  50. def test_set_output_video_with_camera_and_av_success(self):
  51. analyser = VideoAnalyser(camera_id=0)
  52. analyser.open_camera()
  53. analyser.set_output_video(output_video='output.mp4', save_with_av=True)
  54. assert analyser.stream
  55. assert analyser.container
  56. def test_set_output_video_with_video_and_opencv_success(self):
  57. analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
  58. analyser.set_output_video(output_video='output.mp4', save_with_av=False)
  59. assert analyser.out_stream
  60. def test_set_output_video_with_video_and_av_success(self):
  61. analyser = VideoAnalyser()
  62. analyser.set_output_video(output_video='output.mp4', save_with_av=True)
  63. assert analyser.stream
  64. assert analyser.container
  65. def test_is_ok_before_cap_open_return_false(self):
  66. with patch('cv2.VideoCapture') as mock_cap:
  67. mock_cap_instance = mock_cap.return_value
  68. mock_cap_instance.isOpened.return_value = False
  69. analyser = VideoAnalyser()
  70. assert not analyser.is_ok()
  71. def test_is_ok_after_cap_open_return_true(self):
  72. with patch('cv2.VideoCapture') as mock_cap:
  73. mock_cap_instance = mock_cap.return_value
  74. mock_cap_instance.isOpened.return_value = True
  75. analyser = VideoAnalyser()
  76. assert analyser.is_ok()
  77. def test_process_with_save_when_read_success_save_video(self):
  78. with patch('cv2.VideoCapture') as mock_cap:
  79. mock_cap_instance = mock_cap.return_value
  80. mock_cap_instance.read.return_value = (True, gen_fake_image())
  81. mock_save_video = MagicMock()
  82. analyser = VideoAnalyser()
  83. analyser.save_video = mock_save_video
  84. analyser.process(save=True)
  85. assert mock_save_video.called
  86. def test_process_with_save_when_read_failed_not_save_video(self):
  87. with patch('cv2.VideoCapture') as mock_cap:
  88. mock_cap_instance = mock_cap.return_value
  89. mock_cap_instance.read.return_value = (False, None)
  90. mock_save_video = MagicMock()
  91. analyser = VideoAnalyser()
  92. analyser.save_video = mock_save_video
  93. analyser.process(save=True)
  94. assert not mock_save_video.called
  95. def test_process_without_save_when_read_success_not_save_video(self):
  96. with patch('cv2.VideoCapture') as mock_cap:
  97. mock_cap_instance = mock_cap.return_value
  98. mock_cap_instance.read.return_value = (False, None)
  99. mock_save_video = MagicMock()
  100. analyser = VideoAnalyser()
  101. analyser.save_video = mock_save_video
  102. analyser.process(save=False)
  103. assert not mock_save_video.called
  104. def test_save_video_with_av_av_function_called(self):
  105. mock_save_video_with_av = MagicMock()
  106. analyser = VideoAnalyser()
  107. analyser.save_video_with_av = mock_save_video_with_av
  108. analyser.save_with_av = True
  109. analyser.save_video(gen_fake_image(), time.time())
  110. assert mock_save_video_with_av.called
  111. def test_save_video_without_av_opencv_function_called(self):
  112. mock_save_video_with_opencv = MagicMock()
  113. analyser = VideoAnalyser()
  114. analyser.save_video_with_opencv = mock_save_video_with_opencv
  115. analyser.save_with_av = False
  116. analyser.save_video(gen_fake_image(), None)
  117. assert mock_save_video_with_opencv.called
  118. def test_save_video_with_opencv_before_set_output_video_pass(self):
  119. analyser = VideoAnalyser()
  120. analyser.save_video_with_opencv(gen_fake_image())
  121. def test_save_video_with_opencv_with_out_stream_log_error(self):
  122. mock_out_stream = MagicMock()
  123. mock_out_stream.isOpened.return_value = False
  124. analyser = VideoAnalyser()
  125. analyser.out_stream = mock_out_stream
  126. mock_logging_error = MagicMock()
  127. with patch('core.utils.logger.error', mock_logging_error):
  128. analyser.save_video_with_opencv(gen_fake_image())
  129. assert mock_logging_error.called
  130. def test_save_video_with_av_before_set_output_video_pass(self):
  131. analyser = VideoAnalyser()
  132. analyser.save_video_with_av(gen_fake_image(), time.time())
  133. def test_save_video_with_av_with_antecedent_frame_skipped(self):
  134. analyser = VideoAnalyser()
  135. analyser.container = MagicMock()
  136. analyser.stream = MagicMock()
  137. mock_encode = MagicMock()
  138. analyser.stream.encode = mock_encode
  139. t_start = time.time()
  140. analyser.t_start_save_video = t_start + 12
  141. analyser.save_video_with_av(gen_fake_image(), t_start)
  142. assert not mock_encode.called
  143. def test_save_video_with_av_with_valid_frame_success(self):
  144. analyser = VideoAnalyser()
  145. # analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
  146. # analyser.set_output_video(output_video='output.mp4', save_with_av=True)
  147. analyser.container = MagicMock()
  148. analyser.container.mux = MagicMock()
  149. analyser.stream = MagicMock()
  150. mock_encode = MagicMock()
  151. analyser.stream.encode = mock_encode
  152. t_start = time.time()
  153. analyser.t_start_save_video = t_start - 1
  154. analyser.save_video_with_av(gen_fake_image(), t_start)
  155. assert mock_encode.called
  156. def test_release_container_without_save_not_finish_with_a_blank_frame(self):
  157. mock_av_finish_with_a_blank_frame = MagicMock()
  158. analyser = VideoAnalyser()
  159. analyser.av_finish_with_a_blank_frame = \
  160. mock_av_finish_with_a_blank_frame
  161. analyser.set_output_video(output_video='output.mp4', save_with_av=True)
  162. analyser.release_container()
  163. assert not mock_av_finish_with_a_blank_frame.called
  164. def test_release_container_reset_time_start_save_video(self):
  165. mock_av_finish_with_a_blank_frame = MagicMock()
  166. analyser = VideoAnalyser()
  167. analyser.av_finish_with_a_blank_frame = \
  168. mock_av_finish_with_a_blank_frame
  169. analyser.t_start_save_video = time.time()
  170. analyser.release_container()
  171. assert not analyser.t_start_save_video
  172. def test_release_container_reset_previous_pts(self):
  173. mock_av_finish_with_a_blank_frame = MagicMock()
  174. analyser = VideoAnalyser()
  175. analyser.av_finish_with_a_blank_frame = \
  176. mock_av_finish_with_a_blank_frame
  177. analyser.previous_pts = 134
  178. analyser.release_container()
  179. assert 0 == analyser.previous_pts
  180. def test_main():
  181. analyser = VideoAnalyser()
  182. # analyser.set_output_video(output_video=OUTPUT_VIDEO_PATH, save_with_av=True)
  183. analyser.set_output_video(output_video=OUTPUT_VIDEO_PATH)
  184. count = 0
  185. while analyser.is_ok():
  186. count += 1
  187. if count == 196:
  188. break
  189. _, image = analyser.process()
  190. cv2.imshow('base', image)
  191. if cv2.waitKey(1) & 0xFF == ord('q'): #press q to quit
  192. break
  193. cv2.destroyAllWindows()