import numpy as np import itertools from sklearn.model_selection import KFold from sklearn.metrics import roc_auc_score import logging import os logger = logging.getLogger(__name__) def parse_model_type(model_path): model_path = os.path.normpath(model_path) file_name = model_path.split(os.sep)[-1] model_type, events, _ = file_name.split('_') events = events.split('+') return model_type.lower(), events def event_metric(event_true, event_pred, fs, hit_time_range=(0, 3), ignore_event=(0,), f_beta=1.): """评价单试次f_alpha score Args: event_true: event_pred: fs: hit_time_range (tuple): ignore_event (tuple): ignore certain events f_beta (float): f_(alpha) score Return: f_beta score (float): f_alpha score """ event_true = event_true.copy()[np.logical_not(np.isin(event_true[:, 2], ignore_event))] event_pred = event_pred.copy()[np.logical_not(np.isin(event_pred[:, 2], ignore_event))] true_idx = 0 pred_idx = 0 correct_count = 0 hit_time_range = (int(fs * hit_time_range[0]), int(fs * hit_time_range[1])) while true_idx < len(event_true) and pred_idx < len(event_pred): if event_true[true_idx, 0] + hit_time_range[0] <= event_pred[pred_idx, 0] < event_true[true_idx, 0] + hit_time_range[1]: if event_true[true_idx, 2] == event_pred[pred_idx, 2]: correct_count += 1 true_idx += 1 pred_idx += 1 else: pred_idx += 1 elif event_pred[pred_idx, 0] < event_true[true_idx, 0] + hit_time_range[0]: pred_idx += 1 else: true_idx += 1 precision = correct_count / len(event_pred) recall = correct_count / len(event_true) fbeta_score = (1 + f_beta ** 2) * (precision * recall) / (f_beta ** 2 * precision + recall) return precision, recall, fbeta_score def cut_epochs(t, data, timestamps): """ cutting raw data into epochs :param t: tuple (start, end, samplerate) :param data: ndarray (..., n_times), the last dimension should be the times :param timestamps: list of timestamps :return: ndarray (n_epochs, ... , n_times), the first dimension be the epochs """ timestamps = np.array(timestamps) start = timestamps + int(t[0] * t[2]) end = timestamps + int(t[1] * t[2]) # do boundary check if start[0] < 0: start = start[1:] end = end[1:] if end[-1] > data.shape[-1]: start = start[:-1] end = end[:-1] epochs = np.stack([data[..., s:e] for s, e in zip(start, end)], axis=0) return epochs def product_dict(**kwargs): keys = kwargs.keys() vals = kwargs.values() for instance in itertools.product(*vals): yield dict(zip(keys, instance)) def param_search(model_func, X, y, params: dict, random_state=123): """ :param model_func: model builder :param X: ndarray (n_trials, n_channels, n_times) :param y: ndarray (n_trials, ) :param params: dict of params, key is param name and value is search range :param random_state: :return: """ kfold = KFold(n_splits=10, shuffle=True, random_state=random_state) best_auc = -1 best_param = None for p_dict in product_dict(**params): model = model_func(**p_dict) n_classes = len(np.unique(y)) y_pred = np.zeros((len(y), n_classes)) for train_idx, test_idx in kfold.split(X): X_train, y_train = X[train_idx], y[train_idx] X_test = X[test_idx] model.fit(X_train, y_train) y_pred[test_idx] = model.predict_proba(X_test) auc = multiclass_auc_score(y, y_pred, n_classes=n_classes) # update if auc > best_auc: best_param = p_dict best_auc = auc # print each steps logger.debug(f'Current: {p_dict}, {auc}; Best: {best_param}, {best_auc}') return best_auc, best_param def multiclass_auc_score(y_true, prob, n_classes=None): if n_classes is None: n_classes = len(np.unique(y_true)) if n_classes > 2: auc = roc_auc_score(y_true, prob, multi_class='ovr') elif n_classes == 2: auc = roc_auc_score(y_true, prob[:, 1]) else: raise ValueError return auc