"""单元测试sig_buffer""" import collections import numpy as np from device.sig_chain.sig_buffer import CircularBuffer from device.sig_chain.sig_buffer import ParserNewsetWithTime from device.sig_chain.sig_buffer import PaserNewsetWithoutTime TimeStamp = collections.namedtuple("Time", ["timestamp", "data"]) data_len = 10 package_len = 0.1 sig_len = 0.5 chan_labels = ["C3", "C4", "O1", "O2", "Oz"] chan_types = ["eeg"] * len(chan_labels) fs = 1000 sig_mock = np.random.rand(len(chan_labels), int(package_len * fs)) sig_mock_time = TimeStamp(2023, sig_mock) def test_update_is_success(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, PaserNewsetWithoutTime()) ring_len = len(ring.content) ring.update(sig_mock) assert len(ring.content) == ring_len + 1 def test_ring_is_full(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, PaserNewsetWithoutTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock) assert len(ring.content) == data_len / package_len def test_ring_get_sig(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, PaserNewsetWithoutTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock) _ = ring.get_sig() assert len(ring.content) == 0 def test_enough_ring_get_data_status_is_ok(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, PaserNewsetWithoutTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock) data_get = ring.get_sig() status, data = data_get.values() assert data.get_data().shape == (len(chan_labels), ring.data_len * fs) assert status == "ok" def test_not_enough_ring_get_data_status_is_warn(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, PaserNewsetWithoutTime()) ring.update(sig_mock) data_get = ring.get_sig() status, data = data_get.values() assert data is None assert status == "warn" def test_update_is_success_time(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) ring_len = len(ring.content) ring.update(sig_mock_time) assert len(ring.content) == ring_len + 1 def test_ring_is_full_time(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock_time) assert len(ring.content) == data_len / package_len def test_ring_get_sig_time(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock_time) _ = ring.get_sig() assert len(ring.content) == 0 def test_enough_ring_get_data_status_is_ok_time(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock_time) data_get = ring.get_sig() status = data_get["status"] data = data_get["data"] my_time_stamp = data_get["timestamp"] assert data.get_data().shape == (len(chan_labels), ring.data_len * fs) assert status == "ok" assert my_time_stamp[0] == 2023 def test_not_enough_ring_get_data_status_is_warn_time(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) ring.update(sig_mock_time) data_get = ring.get_sig() status = data_get["status"] data = data_get["data"] my_time_stamp = data_get["timestamp"] assert data is None assert status == "warn" assert my_time_stamp is None def test_get_sig_with_clear(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock_time) ring.get_sig(clear=True) assert len(ring.content) == 0 def test_get_sig_without_clear(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) for _ in range(0, int(data_len / package_len)): ring.update(sig_mock_time) ring.get_sig(clear=False) assert len(ring.content) == len(ring.content) def test_not_enough_ring_get_sig_without_clear(): ring = CircularBuffer(data_len, package_len, chan_labels, chan_types, fs, ParserNewsetWithTime()) ring.update(sig_mock_time) ring.get_sig(clear=False) assert len(ring.content) == 1