test_receive.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. """Module tests/core/sig_chain/test_receive provide test for receiver"""
  2. import pytest
  3. import time
  4. import unittest
  5. from unittest import mock
  6. from func_timeout import FunctionTimedOut
  7. from core.sig_chain.device.connector_interface import DataMode
  8. from core.sig_chain.device.connector_interface import Device
  9. from core.sig_chain.sig_receive import Receiver
  10. def teardown_function():
  11. Receiver.clear_instance()
  12. def test_new_receiver_is_not_ready():
  13. receiver = Receiver()
  14. assert not receiver.is_ready
  15. def test_before_select_can_not_setup_connector():
  16. receiver = Receiver()
  17. with pytest.raises(AssertionError):
  18. receiver.setup_connector()
  19. def test_before_setup_connector_receive_data_failed():
  20. receiver = Receiver()
  21. with pytest.raises(AssertionError):
  22. receiver.start_receive_wave()
  23. def test_before_setup_connector_get_data_from_buffer_failed():
  24. receiver = Receiver()
  25. receiver.select_connector(Device.NEO, 1)
  26. receiver.buffer_plot.get_sig = \
  27. mock.MagicMock(return_value={'status': 'ok'})
  28. with pytest.raises(RuntimeError):
  29. receiver.get_data_from_buffer('plot')
  30. receiver.buffer_classify_online.get_sig = \
  31. mock.MagicMock(return_value={'status': 'ok'})
  32. with pytest.raises(RuntimeError):
  33. receiver.get_data_from_buffer('classify_online')
  34. def test_before_setup_connector_stop_receive_pass():
  35. receiver = Receiver()
  36. receiver.select_connector(Device.NEO, 1)
  37. receiver.stop_receive()
  38. def test_after_setup_connector_is_ready():
  39. receiver = Receiver()
  40. receiver.select_connector(Device.NEO, 1)
  41. receiver.connector.get_ready = mock.MagicMock(return_value=True)
  42. receiver.setup_connector()
  43. assert receiver.is_ready
  44. def test_after_setup_wave_receive_mode_is_ready():
  45. receiver = Receiver()
  46. receiver.select_connector(Device.NEO, 1)
  47. receiver.connector.setup_wave_mode = mock.MagicMock(return_value=True)
  48. receiver.setup_receive_mode(DataMode.WAVE)
  49. assert receiver.is_ready
  50. def test_after_setup_impedance_receive_mode_is_ready():
  51. receiver = Receiver()
  52. receiver.select_connector(Device.NEO, 1)
  53. receiver.connector.setup_impedance_mode = \
  54. mock.MagicMock(return_value=True)
  55. receiver.setup_receive_mode(DataMode.IMPEDANCE)
  56. assert receiver.is_ready
  57. def test_failed_setup_wave_receive_mode_is_not_ready():
  58. receiver = Receiver()
  59. receiver.select_connector(Device.NEO, 1)
  60. receiver.connector.setup_wave_mode = mock.MagicMock(return_value=False)
  61. receiver.setup_receive_mode(DataMode.WAVE)
  62. assert not receiver.is_ready
  63. def test_failed_setup_impedance_receive_mode_is_not_ready():
  64. receiver = Receiver()
  65. receiver.select_connector(Device.NEO, 1)
  66. receiver.connector.setup_impedance_mode = mock.MagicMock(return_value=False)
  67. receiver.setup_receive_mode(DataMode.IMPEDANCE)
  68. assert not receiver.is_ready
  69. def test_before_ready_stop_receive_pass():
  70. receiver = Receiver()
  71. receiver.stop_receive()
  72. def test_after_stop_receive_is_not_ready():
  73. receiver = Receiver()
  74. receiver.select_connector(Device.NEO, 1)
  75. receiver.connector.get_ready = mock.MagicMock(return_value=True)
  76. receiver.setup_connector()
  77. receiver.connector.stop = mock.MagicMock(return_value=True)
  78. receiver.stop_receive()
  79. assert not receiver.is_ready
  80. def test_receiver_singleton_keep_status():
  81. receiver = Receiver()
  82. receiver.is_ready = True
  83. receiver = Receiver()
  84. assert receiver.is_ready
  85. def test_change_wave_to_impedance_mode_success():
  86. receiver = Receiver()
  87. receiver.select_connector(Device.NEO, 1)
  88. receiver.clear_all_buffer = mock.MagicMock()
  89. receiver.connector.get_ready = mock.MagicMock(return_value=True)
  90. receiver.setup_connector()
  91. receiver.connector.stop = mock.MagicMock(return_value=True)
  92. receiver.stop_receive()
  93. receiver.connector.setup_impedance_mode = mock.MagicMock(return_value=True)
  94. success = receiver.setup_receive_mode(DataMode.IMPEDANCE)
  95. assert success
  96. def test_after_setup_connector_buffers_are_cleared():
  97. receiver = Receiver()
  98. mock_connector = mock.MagicMock()
  99. mock_connector.get_ready.return_value = True
  100. receiver.connector = mock_connector
  101. mock_clear_all_buffer = mock.MagicMock()
  102. receiver.clear_all_buffer = mock_clear_all_buffer
  103. receiver.setup_connector()
  104. assert mock_clear_all_buffer.called
  105. def test_after_reset_receive_mode_buffers_are_cleared():
  106. receiver = Receiver()
  107. mock_connector = mock.MagicMock()
  108. mock_connector.setup_wave_mode = mock.MagicMock()
  109. receiver.connector = mock_connector
  110. mock_clear_all_buffer = mock.MagicMock()
  111. receiver.clear_all_buffer = mock_clear_all_buffer
  112. receiver.setup_receive_mode(DataMode.WAVE)
  113. assert mock_clear_all_buffer.called
  114. def test_get_data_from_invalid_buffer_type_failed():
  115. receiver = Receiver()
  116. receiver.select_connector(Device.NEO, 1)
  117. receiver.is_ready = True
  118. with pytest.raises(AssertionError):
  119. receiver.get_data_from_buffer('xxx')
  120. def test_get_data_from_buffer_success():
  121. receiver = Receiver()
  122. receiver.select_connector(Device.NEO, 1)
  123. receiver.is_ready = True
  124. mock_data = {'status': 'ok', 'data': 1}
  125. receiver.buffer_plot.get_sig = \
  126. mock.MagicMock(return_value=mock_data)
  127. ret = receiver.get_data_from_buffer('plot')
  128. assert ret == mock_data
  129. @unittest.skip('加入timeout机制会导致卡顿,因此删除此功能')
  130. def test_limit_time_to_get_data_from_buffer():
  131. receiver = Receiver()
  132. receiver.select_connector(Device.NEO, 1)
  133. receiver.is_ready = True
  134. receiver.buffer_plot.get_sig = \
  135. mock.MagicMock(return_value={'status': 'warn'})
  136. with pytest.raises(FunctionTimedOut):
  137. receiver.get_data_from_buffer('plot')
  138. @unittest.skip('依赖硬件')
  139. def test_main():
  140. receiver = Receiver()
  141. receiver.select_connector(Device.NEO, 1)
  142. if receiver.setup_connector():
  143. receiver.start_receive_wave()
  144. for _ in range(20):
  145. time.sleep(1)
  146. data_from_buffer = receiver.get_data_from_buffer('plot')
  147. if data_from_buffer:
  148. raw_data = data_from_buffer['data']
  149. print(raw_data)
  150. receiver.stop_receive()
  151. receiver.setup_receive_mode(DataMode.IMPEDANCE)
  152. for _ in range(20):
  153. time.sleep(1)
  154. impedance = receiver.receive_impedance()
  155. if impedance:
  156. print(impedance)