2
0

viz.py 7.9 KB


  1. import mne
  2. from mne.time_frequency import psd_array_multitaper
  3. import matplotlib.pyplot as plt
  4. import matplotlib as mpl
  5. import numpy as np
  6. from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
  7. from scipy.ndimage import gaussian_filter
  8. from scipy import signal
  9. from .utils import cut_epochs, apply_baseline
  10. from .feature_extractors import filterbank_extractor
  11. def plot_embeddings(embd, y, event_id, size=20, figsize=(4, 5), show_legend=True, colors=None, alpha=1):
  12. fig, ax = plt.subplots(figsize=figsize)
  13. for label in event_id.keys():
  14. l = event_id[label]
  15. idx = y == l
  16. if colors is not None:
  17. ax.scatter(embd[idx, 0], embd[idx, 1], s=size, label=label, color=colors[label], alpha=alpha)
  18. else:
  19. ax.scatter(embd[idx, 0], embd[idx, 1], s=size, label=label, alpha=alpha)
  20. ax.set_xlabel(r"PC1")
  21. ax.set_ylabel(r"PC2")
  22. if show_legend:
  23. ax.legend(frameon=False)
  24. ax.spines['top'].set_visible(False)
  25. ax.spines['right'].set_visible(False)
  26. return fig
  27. def plot_hg_envelope(raw, events, event_id, fs, freqs, tmin, tmax, t_smooth=0.3, target_event='flex'):
  28. power = raw.filter(*freqs).apply_hilbert(envelope=True).get_data()
  29. # moving average
  30. n_smooth = int(t_smooth * fs)
  31. # smooth
  32. power = signal.filtfilt(np.ones(n_smooth) / n_smooth, 1, power, axis=-1)
  33. epochs_hg = cut_epochs((tmin, tmax, fs), power, events[:, 0])
  34. epochs_hg = apply_baseline((tmin, tmax, fs), epochs_hg)
  35. times = np.linspace(tmin, tmax, epochs_hg.shape[-1])
  36. move_average = epochs_hg[events[:, 2] == event_id[target_event]].mean(axis=0)
  37. move_se = epochs_hg[events[:, 2] == event_id[target_event]].std(axis=0) / np.sqrt(np.sum(events[:, 2] == event_id[target_event]))
  38. n_ch = power.shape[0]
  39. fig, axes = plt.subplots(1, 1)
  40. for i in range(n_ch):
  41. axes.plot(times, move_average[i], label=f'ch_{i + 1}')
  42. axes.fill_between(times, move_average[i] - move_se[i], move_average[i] + move_se[i], alpha=0.1)
  43. axes.legend()
  44. fig.suptitle(f'HG line plot ({freqs[0], freqs[1]}))')
  45. return fig
  46. def snapshot_brain(fig_3d, info, data=None, show_name=False):
  47. if data is not None:
  48. cmap = mpl.cm.viridis
  49. norm = mpl.colors.Normalize(vmin=data.min(), vmax=data.max())
  50. mappable = mpl.cm.ScalarMappable(norm=norm, cmap=cmap)
  51. directions = [0, 180] # right, left
  52. figs = []
  53. for d in directions:
  54. # right
  55. mne.viz.set_3d_view(fig_3d, azimuth=d, elevation=70)
  56. xy, im = mne.viz.snapshot_brain_montage(fig_3d, info, hide_sensors=False)
  57. fig, ax = plt.subplots(figsize=(5, 5))
  58. ax.imshow(im, interpolation='none')
  59. ax.set_axis_off()
  60. if data is not None:
  61. fig.subplots_adjust(right=0.8)
  62. cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7])
  63. fig.colorbar(mappable, cax=cbar_ax)
  64. if show_name:
  65. xy_pts = np.vstack([xy[ch] for ch in info["ch_names"]])
  66. for i, pos in enumerate(xy_pts):
  67. ax.text(*pos, i, color='white')
  68. figs.append(fig)
  69. return figs
  70. def plot_raw_tfr(data, sfreq, freqs, n_cycles=14):
  71. # extract power, (n_ch, n_freqs, n_times)
  72. power = mne.time_frequency.tfr_array_morlet(data[None], sfreq, freqs, output='avg_power', n_cycles=n_cycles).squeeze()
  73. # power = filterbank_extractor(data, sfreq, freqs, reshape_freqs_dim=False)
  74. power = 10 * np.log10(power)
  75. # normalize by freqs
  76. power -= power.mean(axis=(0, 2), keepdims=True)
  77. power = gaussian_filter(power, 200, axes=0)
  78. fig, axes = plt.subplots(figsize=(10, 2))
  79. im = axes.imshow(power.mean(axis=0), cmap='RdBu_r',
  80. extent=[0, data.shape[-1] / sfreq, freqs[0], freqs[-1]],
  81. vmin=-6,
  82. vmax=6,
  83. aspect='auto',
  84. origin='lower')
  85. fig.colorbar(im)
  86. return fig
  87. def plot_cls_tfr(data, events, sfreq, freqs, epoch_time_range, event_desc):
  88. """
  89. data: numpy.ndarray, (n_ch, n_times)
  90. events: ndarray (n_events, 3), the first column is onset index, the second is duration, and the third is event type
  91. freqs: numpy.ndarray, frequency bands to filter
  92. epoch_time_range: tuple, (t_onset, t_offset)
  93. event_desc: dict {id: name}
  94. """
  95. # extract power, (n_ch, n_freqs, n_times)
  96. power = filterbank_extractor(data, sfreq, freqs, reshape_freqs_dim=False)
  97. power = 10 * np.log10(power)
  98. # normalize by freqs
  99. power -= power.mean(axis=(0, 2), keepdims=True)
  100. power /= power.std(axis=(0, 2), keepdims=True)
  101. # image vlim
  102. mean_, std_ = power.mean(), power.std()
  103. # cut epochs
  104. epochs = cut_epochs((*epoch_time_range, sfreq), power, events[:, 0])
  105. # average by event type
  106. classes = np.unique(events[:, -1])
  107. fig, axes = plt.subplots(1, len(classes), figsize=(10, 5))
  108. for ax, y_ in zip(axes, classes):
  109. average_power = epochs[events[:, -1] == y_].mean(axis=(0, 1)) # keep freqencies and times
  110. im = ax.imshow(average_power, cmap='RdBu_r',
  111. extent=[*epoch_time_range, freqs[0], freqs[-1]],
  112. vmin=mean_ - 0.5 * std_,
  113. vmax=mean_ + 0.5 * std_,
  114. aspect='auto',
  115. origin='lower')
  116. ax.axvline(0, color='k')
  117. ax.set_title(event_desc[y_])
  118. fig.subplots_adjust(right=0.8)
  119. cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7])
  120. fig.colorbar(im, cax=cbar_ax)
  121. return fig
  122. def plot_ersd(data, events, sfreq, epoch_time_range, event_id, rest_event=0):
  123. n_ch = data.shape[0]
  124. event_desc = {v:k for k, v in event_id.items()}
  125. epochs = cut_epochs((*epoch_time_range, sfreq), data, events[:, 0])
  126. psd, freqs = psd_array_multitaper(epochs, sfreq, fmin=0, fmax=200, bandwidth=15)
  127. mean_psd_rest = psd[events[:, -1] == rest_event].mean(axis=0)
  128. ersds = []
  129. for e in np.unique(events[:, -1]):
  130. if e != rest_event:
  131. mean_psd = psd[events[:, -1] == e].mean(axis=0)
  132. ersd = mean_psd / mean_psd_rest - 1
  133. ersds.append((event_desc[e], ersd))
  134. fig, axes = plt.subplots(n_ch, len(ersds), figsize=(3 * len(ersds), n_ch), sharex=True, sharey=True)
  135. for i in range(n_ch):
  136. if len(ersds) == 1:
  137. for j, ersd in enumerate(ersds):
  138. if i == 0:
  139. axes[i].set_title(ersd[0])
  140. axes[i].plot(freqs, ersd[1][i])
  141. axes[i].set_ylabel(f'ch_{i + 1}')
  142. axes[i].axhline(0, color='gray', linestyle='--')
  143. else:
  144. for j, ersd in enumerate(ersds):
  145. if i == 0:
  146. axes[i, j].set_title(ersd[0])
  147. axes[i, j].plot(freqs, ersd[1][i])
  148. axes[i, j].set_ylabel(f'ch_{i + 1}')
  149. axes[i, j].axhline(0, color='gray', linestyle='--')
  150. fig.suptitle('ERSD')
  151. return fig
  152. def plot_confusion_matrix(y_true, y_pred):
  153. cm = confusion_matrix(y_true, y_pred, normalize='true')
  154. disp = ConfusionMatrixDisplay(cm)
  155. disp.plot()
  156. return disp.figure_
  157. def plot_states(time_range, pred_states, ax, colors=None):
  158. classes = np.unique(pred_states)
  159. if colors is None:
  160. colors = [plt.get_cmap('tab10')(i)[:3] for i in range(len(classes))]
  161. for i, c in enumerate(classes):
  162. ax.fill_between(np.linspace(*time_range, len(pred_states)), 0, 1,
  163. where=(pred_states == c), alpha=0.6, color=colors[i])
  164. return ax
  165. def plot_state_prob_with_cue(time_range, true_states, pred_probs, ax, colors=None):
  166. # normalize
  167. ax.plot(np.linspace(*time_range, len(pred_probs)), pred_probs, color='k')
  168. # for each class, fill different colors
  169. classes = np.unique(true_states)
  170. if colors is None:
  171. colors = [plt.get_cmap('tab10')(i)[:3] for i in range(len(classes))]
  172. for i, c in enumerate(classes):
  173. ax.fill_between(np.linspace(*time_range, len(true_states)), 0, 1,
  174. where=(true_states == c), alpha=0.6, color=colors[i])
  175. return ax