online_sim.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. '''
  2. 模型模拟在线测试脚本
  3. 在线模式测试:event f1-score and decision trace
  4. '''
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import mne
  8. import yaml
  9. import os
  10. import argparse
  11. import logging
  12. from sklearn.metrics import accuracy_score
  13. from dataloaders import neo
  14. import bci_core.online as online
  15. import bci_core.utils as bci_utils
  16. from settings.config import settings
  17. logging.basicConfig(level=logging.DEBUG)
  18. logger = logging.getLogger(__name__)
  19. config_info = settings.CONFIG_INFO
  20. def parse_args():
  21. parser = argparse.ArgumentParser(
  22. description='Model validation'
  23. )
  24. parser.add_argument(
  25. '--subj',
  26. dest='subj',
  27. help='Subject name',
  28. default=None,
  29. type=str
  30. )
  31. parser.add_argument(
  32. '--state-change-threshold',
  33. '-scth',
  34. dest='state_change_threshold',
  35. help='Threshold for HMM state change',
  36. default=0.75,
  37. type=float
  38. )
  39. parser.add_argument(
  40. '--state-trans-prob',
  41. '-stp',
  42. dest='state_trans_prob',
  43. help='Transition probability for HMM state change',
  44. default=0.8,
  45. type=float
  46. )
  47. parser.add_argument(
  48. '--model-filename',
  49. dest='model_filename',
  50. help='Model filename',
  51. default=None,
  52. type=str
  53. )
  54. return parser.parse_args()
  55. class DataGenerator:
  56. def __init__(self, fs, X, epoch_step=1.):
  57. self.fs = int(fs)
  58. self.X = X
  59. self.epoch_step = epoch_step
  60. def get_data_batch(self, current_index):
  61. # return epoch_step length batch
  62. # create mne object
  63. ind = int(self.epoch_step * self.fs)
  64. data = self.X[:, current_index - ind:current_index].copy()
  65. return self.fs, [], data
  66. def loop(self, step_size=0.1):
  67. step = int(step_size * self.fs)
  68. for i in range(self.fs, self.X.shape[1] + 1, step):
  69. yield i / self.fs, self.get_data_batch(i)
  70. def _evaluation_loop(raw, events, model_hmm, epoch_length, step_length, event_trial_length):
  71. val_data = raw.get_data()
  72. fs = raw.info['sfreq']
  73. data_gen = DataGenerator(fs, val_data, epoch_step=epoch_length)
  74. decision_with_hmm = []
  75. decision_without_hmm = []
  76. probs = []
  77. for time, data in data_gen.loop(step_length):
  78. step_p, cls = model_hmm.viterbi(data, return_step_p=True)
  79. if cls >=0:
  80. cls = model_hmm.model.classes_[cls]
  81. decision_with_hmm.append((time, cls)) # map to unified label
  82. decision_without_hmm.append((time, model_hmm.model.classes_[np.argmax(step_p)]))
  83. probs.append((time, model_hmm.probability))
  84. probs = np.array(probs)
  85. events_pred = _construct_model_event(decision_with_hmm, fs)
  86. events_pred_naive = _construct_model_event(decision_without_hmm, fs)
  87. p_hmm, r_hmm, f1_hmm = bci_utils.event_metric(event_true=events, event_pred=events_pred, fs=fs)
  88. p_n, r_n, f1_n = bci_utils.event_metric(events, events_pred_naive, fs=fs)
  89. stim_true = _event_to_stim_channel(events, len(raw.times), trial_length=int(event_trial_length * fs))
  90. stim_pred = _event_to_stim_channel(events_pred, len(raw.times))
  91. stim_pred_naive = _event_to_stim_channel(events_pred_naive, len(raw.times))
  92. accu_hmm = accuracy_score(stim_true, stim_pred)
  93. accu_naive = accuracy_score(stim_true, stim_pred_naive)
  94. fig_pred, ax = plt.subplots(4, 1, sharex=True, sharey=False)
  95. ax[0].set_title('pred (naive)')
  96. ax[0].plot(raw.times, stim_pred_naive)
  97. ax[1].set_title('pred')
  98. ax[1].plot(raw.times, stim_pred)
  99. ax[2].set_title('true')
  100. ax[2].plot(raw.times, stim_true)
  101. ax[3].set_title('prob')
  102. ax[3].plot(probs[:, 0], probs[:, 1])
  103. ax[3].set_ylim([0, 1])
  104. return fig_pred, (p_hmm, r_hmm, f1_hmm, accu_hmm), (p_n, r_n, f1_n, accu_naive)
  105. def simulation(raw_val, event_id, model,
  106. epoch_length=1.,
  107. step_length=0.1,
  108. event_trial_length=5.):
  109. """模型验证接口,使用指定数据进行验证,绘制ersd map
  110. Args:
  111. raw (mne.io.Raw)
  112. event_id (dict)
  113. model: validate existing model,
  114. epoch_length (float): batch data length, default 1 (s)
  115. step_length (float): data step length, default 0.1 (s)
  116. event_trial_length (float):
  117. Returns:
  118. None
  119. """
  120. fs = raw_val.info['sfreq']
  121. events_val, _ = mne.events_from_annotations(raw_val, event_id)
  122. # run with and without hmm
  123. fig_pred, metric_hmm, metric_naive = _evaluation_loop(raw_val,
  124. events_val,
  125. model,
  126. epoch_length,
  127. step_length,
  128. event_trial_length=event_trial_length)
  129. return metric_hmm, metric_naive, fig_pred
  130. def _construct_model_event(decision_seq, fs):
  131. events = []
  132. for i in decision_seq:
  133. time, cls = i
  134. if cls >= 0:
  135. events.append([int(time * fs), 0, cls])
  136. return np.array(events)
  137. def _event_to_stim_channel(events, time_length, trial_length=None):
  138. x = np.zeros(time_length)
  139. for i in range(0, len(events) - 1):
  140. if trial_length is not None:
  141. x[events[i, 0]: events[i, 0] + trial_length] = events[i, 2]
  142. else:
  143. x[events[i, 0]: events[i + 1, 0] - 1] = events[i, 2]
  144. return x
  145. if __name__ == '__main__':
  146. args = parse_args()
  147. subj_name = args.subj
  148. data_dir = f'./data/{subj_name}/'
  149. model_path = f'./static/models/{subj_name}/{args.model_filename}'
  150. with open(os.path.join(data_dir, 'val_info.yml'), 'r') as f:
  151. info = yaml.safe_load(f)
  152. sessions = info['sessions']
  153. # preprocess raw
  154. trial_time = 5.
  155. raw, event_id = neo.raw_loader(data_dir, sessions,
  156. ori_epoch_length=trial_time,
  157. upsampled_epoch_length=None)
  158. # load model
  159. input_kwargs = {
  160. 'state_trans_prob': args.state_trans_prob,
  161. 'state_change_threshold': args.state_change_threshold
  162. }
  163. model_hmm = online.model_loader(model_path, **input_kwargs)
  164. # do validations
  165. metric_hmm, metric_naive, fig_pred = simulation(raw,
  166. event_id,
  167. model=model_hmm,
  168. epoch_length=config_info['buffer_length'],
  169. step_length=config_info['buffer_length'],
  170. event_trial_length=trial_time)
  171. fig_pred.savefig(os.path.join(data_dir, 'pred.pdf'))
  172. logger.info(f'With HMM: precision: {metric_hmm[0]:.4f}, recall: {metric_hmm[1]:.4f}, f1_score: {metric_hmm[2]:.4f}, accuracy: {metric_hmm[3]:.4f}')
  173. logger.info(f'Without HMM: precision: {metric_naive[0]:.4f}, recall: {metric_naive[1]:.4f}, f1_score: {metric_naive[2]:.4f}, accuracy: {metric_naive[3]:.4f}')