123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- """环形buffer,用来缓存一定长度的数据"""
- import collections
- import itertools
- import math
- import mne
- import numpy as np
- from core.sig_chain.device.montage_base_model import MontageBase
- from core.sig_chain.utils import Observer
- class ParserNewset():
- """策略方法类
- """
- def parser_newset(self,
- package_num,
- content,
- mbm_created,
- dataformat="mne"):
- """策略类方法接口
- Args:
- package_num (int): 包的数量
- content (dqueue): 数据队列
- mbm_created (class): mne 的info
- dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它
- 值都会返回nparray格式. Defaults to "mne".
- """
- pass
- class ParserNewsetWithTime(ParserNewset):
- """类策略方法:解析有时间戳的数据
- Args:
- ParserNewset (class): 父类
- """
- def parser_newset(self,
- package_num,
- content,
- mbm_created,
- dataformat="mne"):
- """解析数据有时间戳的数据
- Args:
- package_num (int): 包数量
- content (class): dqueue
- mbm_created (class): mne 的info
- dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它
- 值都会返回nparray格式. Defaults to "mne".
- Returns:
- dict: 返回数据和状态和时间戳
- """
- status = "unknown"
- if content and len(content) >= package_num:
- data_list = []
- time_list = []
- for con in list(content):
- data_list.append(con.data)
- time_list.append(con.timestamp)
- signals = np.concatenate(data_list, axis=1)
- status = "ok"
- raw_data = mne.io.RawArray(
- signals, mbm_created.info) if dataformat == "mne" else signals
- return {"status": status, "data": raw_data, "timestamp": time_list}
- else:
- return {"status": "warn", "data": None, "timestamp": None}
- class PaserNewsetWithoutTime(ParserNewset):
- """类策略方法:解析没有时间戳的数据
- Args:
- ParserNewset (class): 父类
- """
- def parser_newset(self,
- package_num,
- content,
- mbm_created,
- dataformat="mne"):
- """解析数据没有时间戳的数据
- Args:
- package_num (int): 包数量
- content (class): dqueue
- mbm_created (class): mne 的info
- dataformat (str, optional): 数据类型,默认为mne格式,目前dataformat为任意其它
- 值都会返回nparray格式. Defaults to "mne".
- Returns:
- dict: 返回数据和状态
- """
- status = "unknown"
- if content and len(content) >= package_num:
- signals = np.concatenate(tuple(
- list(itertools.islice(content, 0, None))),
- axis=1)
- status = "ok"
- raw_data = mne.io.RawArray(
- signals, mbm_created.info) if dataformat == "mne" else signals
- return {"status": status, "data": raw_data}
- else:
- return {"status": "warn", "data": None}
- class CircularBuffer(Observer):
- """环形buffer类"""
- def __init__(self, data_len, package_len, chan_labels, chan_types, fs,
- parser):
- """初始化一个环形buffer
- Args:
- data_len (float): 数据长度,以秒为单位,例如要缓存20s的数据,data_len值为20
- package_len (float): 包长度,以秒为单位,例如设备每100ms发送一个包,则package_len值为0.1
- chan_labels (List[str]): 导联标签
- chan_types (List[str]): 可以是任意str,一般写为"eeg"即可,注意要对每个chan_labels都定义
- fs (float): 采样率
- parser (class): 数据解析,来自于ParserNewset的类策略
- """
- self.data_len = data_len
- self.package_len = package_len
- self.package_num = math.ceil(self.data_len / self.package_len)
- self.chan_labels = chan_labels
- self.fs = fs
- self.content = collections.deque(maxlen=self.package_num)
- self.mbm_created = MontageBase(chan_labels, chan_types, fs)
- self._shape_status = {"ok": "ok", "warn": "warn"}
- self.parser = parser
- def update(self, newset):
- """更新buffer中的数据
- Args:
- newset (np array or other): 设备定时发来的数据,一般为chan_count*samples的二维矩阵
- """
- # if newset.any():
- # if newset:
- self.content.append(newset)
- # else:
- # pass
- def get_sig(self, dataformat="mne", clear=True):
- """获得数据并转为mne格式
- Args:
- dataformat (str): 数据类型,默认为mne格式,目前dataformat为任意其它值都会返回nparray格式
- clear (bool): 是否清空buffer的标志,默认为清空
- Returns:
- dict: 一个字典,"status"表示得到的数据维度是否正确,"ok"表示正确,"warn"表示维度和预期不相符;
- "data"默认为mne格式,也可以为nparray,根据策略方法不同,需要时也会有时间戳的输出
- """
- ret = self.parser.parser_newset(self.package_num, self.content,
- self.mbm_created, dataformat)
- if ret["status"] == "ok" and clear:
- self.content.clear()
- return ret
|