123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- """Test for video analyser """
- import os
- import time
- from unittest.mock import patch
- from unittest.mock import MagicMock
- import cv2
- import numpy as np
- from core.utils import VideoAnalyser
- TEST_DATA_PATH = 'tests/data/'
- INPUT_VIDEO_PATH = os.path.join(TEST_DATA_PATH, 'normal_side.mp4')
- OUTPUT_VIDEO_PATH = os.path.join(TEST_DATA_PATH, 'test_base.mp4')
- def setup_module():
- if not os.path.exists(TEST_DATA_PATH):
- os.makedirs(TEST_DATA_PATH)
- def teardown_function():
- if os.path.exists(OUTPUT_VIDEO_PATH):
- os.remove(OUTPUT_VIDEO_PATH)
- def gen_fake_image():
- return np.zeros((640, 320, 3), dtype=np.uint8)
- class TestVideoAnalyser:
- def test_init_without_input_video_is_camera(self):
- analyser = VideoAnalyser(input_video=None)
- assert analyser.is_camera
- def test_init_with_input_video_is_not_camera(self):
- mock_video_capture = MagicMock()
- mock_video_capture.release = MagicMock()
- with patch('cv2.VideoCapture', mock_video_capture):
- analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
- assert not analyser.is_camera
- def test_close_with_opencv_release_resource(self):
- analyser = VideoAnalyser()
- analyser.set_output_video(output_video='output.mp4', save_with_av=False)
- analyser.close()
- assert not analyser.cap.isOpened()
- assert not analyser.out_stream
- def test_close_with_av_release_resource(self):
- mock_release_container = MagicMock()
- analyser = VideoAnalyser()
- analyser.set_output_video(output_video='output.mp4', save_with_av=True)
- analyser.release_container = mock_release_container
- analyser.close()
- assert not analyser.cap.isOpened()
- assert not analyser.container
- assert mock_release_container.called
- def test_set_output_video_with_camera_and_opencv_success(self):
- analyser = VideoAnalyser(camera_id=0)
- analyser.open_camera()
- analyser.set_output_video(output_video='output.mp4', save_with_av=False)
- assert analyser.out_stream
- def test_set_output_video_with_camera_and_av_success(self):
- analyser = VideoAnalyser(camera_id=0)
- analyser.open_camera()
- analyser.set_output_video(output_video='output.mp4', save_with_av=True)
- assert analyser.stream
- assert analyser.container
- def test_set_output_video_with_video_and_opencv_success(self):
- analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
- analyser.set_output_video(output_video='output.mp4', save_with_av=False)
- assert analyser.out_stream
- def test_set_output_video_with_video_and_av_success(self):
- analyser = VideoAnalyser()
- analyser.set_output_video(output_video='output.mp4', save_with_av=True)
- assert analyser.stream
- assert analyser.container
- def test_is_ok_before_cap_open_return_false(self):
- with patch('cv2.VideoCapture') as mock_cap:
- mock_cap_instance = mock_cap.return_value
- mock_cap_instance.isOpened.return_value = False
- analyser = VideoAnalyser()
- assert not analyser.is_ok()
- def test_is_ok_after_cap_open_return_true(self):
- with patch('cv2.VideoCapture') as mock_cap:
- mock_cap_instance = mock_cap.return_value
- mock_cap_instance.isOpened.return_value = True
- analyser = VideoAnalyser()
- assert analyser.is_ok()
- def test_process_with_save_when_read_success_save_video(self):
- with patch('cv2.VideoCapture') as mock_cap:
- mock_cap_instance = mock_cap.return_value
- mock_cap_instance.read.return_value = (True, gen_fake_image())
- mock_save_video = MagicMock()
- analyser = VideoAnalyser()
- analyser.save_video = mock_save_video
- analyser.process(save=True)
- assert mock_save_video.called
- def test_process_with_save_when_read_failed_not_save_video(self):
- with patch('cv2.VideoCapture') as mock_cap:
- mock_cap_instance = mock_cap.return_value
- mock_cap_instance.read.return_value = (False, None)
- mock_save_video = MagicMock()
- analyser = VideoAnalyser()
- analyser.save_video = mock_save_video
- analyser.process(save=True)
- assert not mock_save_video.called
- def test_process_without_save_when_read_success_not_save_video(self):
- with patch('cv2.VideoCapture') as mock_cap:
- mock_cap_instance = mock_cap.return_value
- mock_cap_instance.read.return_value = (False, None)
- mock_save_video = MagicMock()
- analyser = VideoAnalyser()
- analyser.save_video = mock_save_video
- analyser.process(save=False)
- assert not mock_save_video.called
- def test_save_video_with_av_av_function_called(self):
- mock_save_video_with_av = MagicMock()
- analyser = VideoAnalyser()
- analyser.save_video_with_av = mock_save_video_with_av
- analyser.save_with_av = True
- analyser.save_video(gen_fake_image(), time.time())
- assert mock_save_video_with_av.called
- def test_save_video_without_av_opencv_function_called(self):
- mock_save_video_with_opencv = MagicMock()
- analyser = VideoAnalyser()
- analyser.save_video_with_opencv = mock_save_video_with_opencv
- analyser.save_with_av = False
- analyser.save_video(gen_fake_image(), None)
- assert mock_save_video_with_opencv.called
- def test_save_video_with_opencv_before_set_output_video_pass(self):
- analyser = VideoAnalyser()
- analyser.save_video_with_opencv(gen_fake_image())
- def test_save_video_with_opencv_with_out_stream_log_error(self):
- mock_out_stream = MagicMock()
- mock_out_stream.isOpened.return_value = False
- analyser = VideoAnalyser()
- analyser.out_stream = mock_out_stream
- mock_logging_error = MagicMock()
- with patch('core.utils.logger.error', mock_logging_error):
- analyser.save_video_with_opencv(gen_fake_image())
- assert mock_logging_error.called
- def test_save_video_with_av_before_set_output_video_pass(self):
- analyser = VideoAnalyser()
- analyser.save_video_with_av(gen_fake_image(), time.time())
- def test_save_video_with_av_with_antecedent_frame_skipped(self):
- analyser = VideoAnalyser()
- analyser.container = MagicMock()
- analyser.stream = MagicMock()
- mock_encode = MagicMock()
- analyser.stream.encode = mock_encode
- t_start = time.time()
- analyser.t_start_save_video = t_start + 12
- analyser.save_video_with_av(gen_fake_image(), t_start)
- assert not mock_encode.called
- def test_save_video_with_av_with_valid_frame_success(self):
- analyser = VideoAnalyser()
- # analyser = VideoAnalyser(input_video=INPUT_VIDEO_PATH)
- # analyser.set_output_video(output_video='output.mp4', save_with_av=True)
- analyser.container = MagicMock()
- analyser.container.mux = MagicMock()
- analyser.stream = MagicMock()
- mock_encode = MagicMock()
- analyser.stream.encode = mock_encode
- t_start = time.time()
- analyser.t_start_save_video = t_start - 1
- analyser.save_video_with_av(gen_fake_image(), t_start)
- assert mock_encode.called
- def test_release_container_without_save_not_finish_with_a_blank_frame(self):
- mock_av_finish_with_a_blank_frame = MagicMock()
- analyser = VideoAnalyser()
- analyser.av_finish_with_a_blank_frame = \
- mock_av_finish_with_a_blank_frame
- analyser.set_output_video(output_video='output.mp4', save_with_av=True)
- analyser.release_container()
- assert not mock_av_finish_with_a_blank_frame.called
- def test_release_container_reset_time_start_save_video(self):
- mock_av_finish_with_a_blank_frame = MagicMock()
- analyser = VideoAnalyser()
- analyser.av_finish_with_a_blank_frame = \
- mock_av_finish_with_a_blank_frame
- analyser.t_start_save_video = time.time()
- analyser.release_container()
- assert not analyser.t_start_save_video
- def test_release_container_reset_previous_pts(self):
- mock_av_finish_with_a_blank_frame = MagicMock()
- analyser = VideoAnalyser()
- analyser.av_finish_with_a_blank_frame = \
- mock_av_finish_with_a_blank_frame
- analyser.previous_pts = 134
- analyser.release_container()
- assert 0 == analyser.previous_pts
- def test_main():
- analyser = VideoAnalyser()
- # analyser.set_output_video(output_video=OUTPUT_VIDEO_PATH, save_with_av=True)
- analyser.set_output_video(output_video=OUTPUT_VIDEO_PATH)
- count = 0
- while analyser.is_ok():
- count += 1
- if count == 196:
- break
- _, image = analyser.process()
- cv2.imshow('base', image)
- if cv2.waitKey(1) & 0xFF == ord('q'): #press q to quit
- break
- cv2.destroyAllWindows()
|