"""环形buffer,用来缓存一定长度的数据""" import collections import itertools import math import mne import numpy as np from core.sig_chain.device.montage_base_model import MontageBase from core.sig_chain.utils import Observer class ParserNewset(): """策略方法类 """ def parser_newset(self, package_num, content, mbm_created, dataformat="mne"): """策略类方法接口 Args: package_num (int): 包的数量 content (dqueue): 数据队列 mbm_created (class): mne 的info dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它 值都会返回nparray格式. Defaults to "mne". """ pass class ParserNewsetWithTime(ParserNewset): """类策略方法:解析有时间戳的数据 Args: ParserNewset (class): 父类 """ def parser_newset(self, package_num, content, mbm_created, dataformat="mne"): """解析数据有时间戳的数据 Args: package_num (int): 包数量 content (class): dqueue mbm_created (class): mne 的info dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它 值都会返回nparray格式. Defaults to "mne". Returns: dict: 返回数据和状态和时间戳 """ status = "unknown" if content and len(content) >= package_num: data_list = [] time_list = [] for con in list(content): data_list.append(con.data) time_list.append(con.timestamp) signals = np.concatenate(data_list, axis=1) status = "ok" raw_data = mne.io.RawArray( signals, mbm_created.info) if dataformat == "mne" else signals return {"status": status, "data": raw_data, "timestamp": time_list} else: return {"status": "warn", "data": None, "timestamp": None} class PaserNewsetWithoutTime(ParserNewset): """类策略方法:解析没有时间戳的数据 Args: ParserNewset (class): 父类 """ def parser_newset(self, package_num, content, mbm_created, dataformat="mne"): """解析数据没有时间戳的数据 Args: package_num (int): 包数量 content (class): dqueue mbm_created (class): mne 的info dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它 值都会返回nparray格式. Defaults to "mne". Returns: dict: 返回数据和状态 """ status = "unknown" if content and len(content) >= package_num: signals = np.concatenate(tuple( list(itertools.islice(content, 0, None))), axis=1) status = "ok" raw_data = mne.io.RawArray( signals, mbm_created.info) if dataformat == "mne" else signals return {"status": status, "data": raw_data} else: return {"status": "warn", "data": None} class CircularBuffer(Observer): """环形buffer类""" def __init__(self, data_len, package_len, chan_labels, chan_types, fs, parser): """初始化一个环形buffer Args: data_len (float): 数据长度,以秒为单位,例如要缓存20s的数据,data_len值为20 package_len (float): 包长度,以秒为单位,例如设备每100ms发送一个包,则package_len值为0.1 chan_labels (List[str]): 导联标签 chan_types (List[str]): 可以是任意str,一般写为"eeg"即可,注意要对每个chan_labels都定义 fs (float): 采样率 parser (class): 数据解析,来自于ParserNewset的类策略 """ self.data_len = data_len self.package_len = package_len self.package_num = math.ceil(self.data_len / self.package_len) self.chan_labels = chan_labels self.fs = fs self.content = collections.deque(maxlen=self.package_num) self.mbm_created = MontageBase(chan_labels, chan_types, fs) self._shape_status = {"ok": "ok", "warn": "warn"} self.parser = parser def update(self, newset): """更新buffer中的数据 Args: newset (np array or other): 设备定时发来的数据,一般为chan_count*samples的二维矩阵 """ # if newset.any(): # if newset: self.content.append(newset) # else: # pass def get_sig(self, dataformat="mne", clear=True): """获得数据并转为mne格式 Args: dataformat (str): 数据类型,默认为mne格式,目前dataformat为任意其它值都会返回nparray格式 clear (bool): 是否清空buffer的标志,默认为清空 Returns: dict: 一个字典,"status"表示得到的数据维度是否正确,"ok"表示正确,"warn"表示维度和预期不相符; "data"默认为mne格式,也可以为nparray,根据策略方法不同,需要时也会有时间戳的输出 """ ret = self.parser.parser_newset(self.package_num, self.content, self.mbm_created, dataformat) if ret["status"] == "ok" and clear: self.content.clear() return ret