123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- """Module tests/core/sig_chain/test_receive provide test for receiver"""
- import pytest
- import time
- import unittest
- from unittest import mock
- from func_timeout import FunctionTimedOut
- from core.sig_chain.device.connector_interface import DataMode
- from core.sig_chain.device.connector_interface import Device
- from core.sig_chain.sig_receive import Receiver
- def teardown_function():
- Receiver.clear_instance()
- def test_new_receiver_is_not_ready():
- receiver = Receiver()
- assert not receiver.is_ready
- def test_before_select_can_not_setup_connector():
- receiver = Receiver()
- with pytest.raises(AssertionError):
- receiver.setup_connector()
- def test_before_setup_connector_receive_data_failed():
- receiver = Receiver()
- with pytest.raises(AssertionError):
- receiver.start_receive_wave()
- def test_before_setup_connector_get_data_from_buffer_failed():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.buffer_plot.get_sig = \
- mock.MagicMock(return_value={'status': 'ok'})
- with pytest.raises(RuntimeError):
- receiver.get_data_from_buffer('plot')
- receiver.buffer_classify_online.get_sig = \
- mock.MagicMock(return_value={'status': 'ok'})
- with pytest.raises(RuntimeError):
- receiver.get_data_from_buffer('classify_online')
- def test_before_setup_connector_stop_receive_pass():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.stop_receive()
- def test_after_setup_connector_is_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.get_ready = mock.MagicMock(return_value=True)
- receiver.setup_connector()
- assert receiver.is_ready
- def test_after_setup_wave_receive_mode_is_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.setup_wave_mode = mock.MagicMock(return_value=True)
- receiver.setup_receive_mode(DataMode.WAVE)
- assert receiver.is_ready
- def test_after_setup_impedance_receive_mode_is_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.setup_impedance_mode = \
- mock.MagicMock(return_value=True)
- receiver.setup_receive_mode(DataMode.IMPEDANCE)
- assert receiver.is_ready
- def test_failed_setup_wave_receive_mode_is_not_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.setup_wave_mode = mock.MagicMock(return_value=False)
- receiver.setup_receive_mode(DataMode.WAVE)
- assert not receiver.is_ready
- def test_failed_setup_impedance_receive_mode_is_not_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.setup_impedance_mode = mock.MagicMock(return_value=False)
- receiver.setup_receive_mode(DataMode.IMPEDANCE)
- assert not receiver.is_ready
- def test_before_ready_stop_receive_pass():
- receiver = Receiver()
- receiver.stop_receive()
- def test_after_stop_receive_is_not_ready():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.connector.get_ready = mock.MagicMock(return_value=True)
- receiver.setup_connector()
- receiver.connector.stop = mock.MagicMock(return_value=True)
- receiver.stop_receive()
- assert not receiver.is_ready
- def test_receiver_singleton_keep_status():
- receiver = Receiver()
- receiver.is_ready = True
- receiver = Receiver()
- assert receiver.is_ready
- def test_change_wave_to_impedance_mode_success():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.clear_all_buffer = mock.MagicMock()
- receiver.connector.get_ready = mock.MagicMock(return_value=True)
- receiver.setup_connector()
- receiver.connector.stop = mock.MagicMock(return_value=True)
- receiver.stop_receive()
- receiver.connector.setup_impedance_mode = mock.MagicMock(return_value=True)
- success = receiver.setup_receive_mode(DataMode.IMPEDANCE)
- assert success
- def test_after_setup_connector_buffers_are_cleared():
- receiver = Receiver()
- mock_connector = mock.MagicMock()
- mock_connector.get_ready.return_value = True
- receiver.connector = mock_connector
- mock_clear_all_buffer = mock.MagicMock()
- receiver.clear_all_buffer = mock_clear_all_buffer
- receiver.setup_connector()
- assert mock_clear_all_buffer.called
- def test_after_reset_receive_mode_buffers_are_cleared():
- receiver = Receiver()
- mock_connector = mock.MagicMock()
- mock_connector.setup_wave_mode = mock.MagicMock()
- receiver.connector = mock_connector
- mock_clear_all_buffer = mock.MagicMock()
- receiver.clear_all_buffer = mock_clear_all_buffer
- receiver.setup_receive_mode(DataMode.WAVE)
- assert mock_clear_all_buffer.called
- def test_get_data_from_invalid_buffer_type_failed():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.is_ready = True
- with pytest.raises(AssertionError):
- receiver.get_data_from_buffer('xxx')
- def test_get_data_from_buffer_success():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.is_ready = True
- mock_data = {'status': 'ok', 'data': 1}
- receiver.buffer_plot.get_sig = \
- mock.MagicMock(return_value=mock_data)
- ret = receiver.get_data_from_buffer('plot')
- assert ret == mock_data
- @unittest.skip('加入timeout机制会导致卡顿,因此删除此功能')
- def test_limit_time_to_get_data_from_buffer():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- receiver.is_ready = True
- receiver.buffer_plot.get_sig = \
- mock.MagicMock(return_value={'status': 'warn'})
- with pytest.raises(FunctionTimedOut):
- receiver.get_data_from_buffer('plot')
- @unittest.skip('依赖硬件')
- def test_main():
- receiver = Receiver()
- receiver.select_connector(Device.NEO, 1)
- if receiver.setup_connector():
- receiver.start_receive_wave()
- for _ in range(20):
- time.sleep(1)
- data_from_buffer = receiver.get_data_from_buffer('plot')
- if data_from_buffer:
- raw_data = data_from_buffer['data']
- print(raw_data)
- receiver.stop_receive()
- receiver.setup_receive_mode(DataMode.IMPEDANCE)
- for _ in range(20):
- time.sleep(1)
- impedance = receiver.receive_impedance()
- if impedance:
- print(impedance)
|