123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """单元测试sig_buffer"""
- import collections
- import numpy as np
- from device.sig_chain.sig_buffer import CircularBuffer
- from device.sig_chain.sig_buffer import ParserNewsetWithTime
- from device.sig_chain.sig_buffer import PaserNewsetWithoutTime
- TimeStamp = collections.namedtuple("Time", ["timestamp", "data"])
- data_len = 10
- package_len = 0.1
- sig_len = 0.5
- chan_labels = ["C3", "C4", "O1", "O2", "Oz"]
- chan_types = ["eeg"] * len(chan_labels)
- fs = 1000
- sig_mock = np.random.rand(len(chan_labels), int(package_len * fs))
- sig_mock_time = TimeStamp(2023, sig_mock)
- def test_update_is_success():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- PaserNewsetWithoutTime())
- ring_len = len(ring.content)
- ring.update(sig_mock)
- assert len(ring.content) == ring_len + 1
- def test_ring_is_full():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- PaserNewsetWithoutTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock)
- assert len(ring.content) == data_len / package_len
- def test_ring_get_sig():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- PaserNewsetWithoutTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock)
- _ = ring.get_sig()
- assert len(ring.content) == 0
- def test_enough_ring_get_data_status_is_ok():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- PaserNewsetWithoutTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock)
- data_get = ring.get_sig()
- status, data = data_get.values()
- assert data.get_data().shape == (len(chan_labels), ring.data_len * fs)
- assert status == "ok"
- def test_not_enough_ring_get_data_status_is_warn():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- PaserNewsetWithoutTime())
- ring.update(sig_mock)
- data_get = ring.get_sig()
- status, data = data_get.values()
- assert data is None
- assert status == "warn"
- def test_update_is_success_time():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- ring_len = len(ring.content)
- ring.update(sig_mock_time)
- assert len(ring.content) == ring_len + 1
- def test_ring_is_full_time():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock_time)
- assert len(ring.content) == data_len / package_len
- def test_ring_get_sig_time():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock_time)
- _ = ring.get_sig()
- assert len(ring.content) == 0
- def test_enough_ring_get_data_status_is_ok_time():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock_time)
- data_get = ring.get_sig()
- status = data_get["status"]
- data = data_get["data"]
- my_time_stamp = data_get["timestamp"]
- assert data.get_data().shape == (len(chan_labels), ring.data_len * fs)
- assert status == "ok"
- assert my_time_stamp[0] == 2023
- def test_not_enough_ring_get_data_status_is_warn_time():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- ring.update(sig_mock_time)
- data_get = ring.get_sig()
- status = data_get["status"]
- data = data_get["data"]
- my_time_stamp = data_get["timestamp"]
- assert data is None
- assert status == "warn"
- assert my_time_stamp is None
- def test_get_sig_with_clear():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock_time)
- ring.get_sig(clear=True)
- assert len(ring.content) == 0
- def test_get_sig_without_clear():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- for _ in range(0, int(data_len / package_len)):
- ring.update(sig_mock_time)
- ring.get_sig(clear=False)
- assert len(ring.content) == len(ring.content)
- def test_not_enough_ring_get_sig_without_clear():
- ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs,
- ParserNewsetWithTime())
- ring.update(sig_mock_time)
- ring.get_sig(clear=False)
- assert len(ring.content) == 1
|