test_sig_buffer.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """单元测试sig_buffer"""
  2. import collections
  3. import numpy as np
  4. from device.sig_chain.sig_buffer import CircularBuffer
  5. from device.sig_chain.sig_buffer import ParserNewsetWithTime
  6. from device.sig_chain.sig_buffer import PaserNewsetWithoutTime
  7. TimeStamp = collections.namedtuple("Time", ["timestamp", "data"])
  8. data_len = 10
  9. package_len = 0.1
  10. sig_len = 0.5
  11. chan_labels = ["C3", "C4", "O1", "O2", "Oz"]
  12. chan_types = ["eeg"] * len(chan_labels)
  13. fs = 1000
  14. sig_mock = np.random.rand(len(chan_labels), int(package_len * fs))
  15. sig_mock_time = TimeStamp(2023, sig_mock)
  16. def test_update_is_success():
  17. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  18. PaserNewsetWithoutTime())
  19. ring_len = len(ring.content)
  20. ring.update(sig_mock)
  21. assert len(ring.content) == ring_len + 1
  22. def test_ring_is_full():
  23. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  24. PaserNewsetWithoutTime())
  25. for _ in range(0, int(data_len / package_len)):
  26. ring.update(sig_mock)
  27. assert len(ring.content) == data_len / package_len
  28. def test_ring_get_sig():
  29. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  30. PaserNewsetWithoutTime())
  31. for _ in range(0, int(data_len / package_len)):
  32. ring.update(sig_mock)
  33. _ = ring.get_sig()
  34. assert len(ring.content) == 0
  35. def test_enough_ring_get_data_status_is_ok():
  36. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  37. PaserNewsetWithoutTime())
  38. for _ in range(0, int(data_len / package_len)):
  39. ring.update(sig_mock)
  40. data_get = ring.get_sig()
  41. status, data = data_get.values()
  42. assert data.get_data().shape == (len(chan_labels), ring.data_len * fs)
  43. assert status == "ok"
  44. def test_not_enough_ring_get_data_status_is_warn():
  45. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  46. PaserNewsetWithoutTime())
  47. ring.update(sig_mock)
  48. data_get = ring.get_sig()
  49. status, data = data_get.values()
  50. assert data is None
  51. assert status == "warn"
  52. def test_update_is_success_time():
  53. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  54. ParserNewsetWithTime())
  55. ring_len = len(ring.content)
  56. ring.update(sig_mock_time)
  57. assert len(ring.content) == ring_len + 1
  58. def test_ring_is_full_time():
  59. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  60. ParserNewsetWithTime())
  61. for _ in range(0, int(data_len / package_len)):
  62. ring.update(sig_mock_time)
  63. assert len(ring.content) == data_len / package_len
  64. def test_ring_get_sig_time():
  65. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  66. ParserNewsetWithTime())
  67. for _ in range(0, int(data_len / package_len)):
  68. ring.update(sig_mock_time)
  69. _ = ring.get_sig()
  70. assert len(ring.content) == 0
  71. def test_enough_ring_get_data_status_is_ok_time():
  72. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  73. ParserNewsetWithTime())
  74. for _ in range(0, int(data_len / package_len)):
  75. ring.update(sig_mock_time)
  76. data_get = ring.get_sig()
  77. status = data_get["status"]
  78. data = data_get["data"]
  79. my_time_stamp = data_get["timestamp"]
  80. assert data.get_data().shape == (len(chan_labels), ring.data_len * fs)
  81. assert status == "ok"
  82. assert my_time_stamp[0] == 2023
  83. def test_not_enough_ring_get_data_status_is_warn_time():
  84. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  85. ParserNewsetWithTime())
  86. ring.update(sig_mock_time)
  87. data_get = ring.get_sig()
  88. status = data_get["status"]
  89. data = data_get["data"]
  90. my_time_stamp = data_get["timestamp"]
  91. assert data is None
  92. assert status == "warn"
  93. assert my_time_stamp is None
  94. def test_get_sig_with_clear():
  95. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  96. ParserNewsetWithTime())
  97. for _ in range(0, int(data_len / package_len)):
  98. ring.update(sig_mock_time)
  99. ring.get_sig(clear=True)
  100. assert len(ring.content) == 0
  101. def test_get_sig_without_clear():
  102. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  103. ParserNewsetWithTime())
  104. for _ in range(0, int(data_len / package_len)):
  105. ring.update(sig_mock_time)
  106. ring.get_sig(clear=False)
  107. assert len(ring.content) == len(ring.content)
  108. def test_not_enough_ring_get_sig_without_clear():
  109. ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
  110. ParserNewsetWithTime())
  111. ring.update(sig_mock_time)
  112. ring.get_sig(clear=False)
  113. assert len(ring.content) == 1