import os import argparse from datetime import datetime import random from psychopy import visual, core, event, logging from device.data_client import NeuracleDataClient from device.trigger_box import TriggerNeuracle from device.fubo_pneumatic_finger import FingerController from device.arduino import send_toggle_command,connect_to_arduino from settings.config import settings from bci_core.online import Controller, model_loader config_info = settings.CONFIG_INFO fingermodel_ids_inverse = settings.FINGERMODEL_IDS_INVERSE # get train params def parse_args(): parser = argparse.ArgumentParser( description='Hand gesture train' ) parser.add_argument( '--subj', dest='subj', help='Subject name', default=None, type=str ) parser.add_argument( '--hand-port', dest='hand_port', help='Peripheral serial port', type=str ) parser.add_argument( '--trigger-port', dest='trigger_port', help='Triggerbox serial port', type=str ) parser.add_argument( '--arduino-port', dest='arduino_port', help='Arduino serial port', type=str ) parser.add_argument( '--state-change-threshold', '-scth', dest='state_change_threshold', help='Threshold for HMM state change', type=float ) parser.add_argument( '--state-trans-prob', '-stp', dest='state_trans_prob', help='Transition probability for HMM state change', default=0.8, type=float ) parser.add_argument( '--momentum', help='Probability update momentum', default=0.5, type=float ) parser.add_argument( '--model-filename', dest='model_filename', help='Model file', default=None, type=str ) parser.add_argument( '--debug', action='store_true', help='Store true to debug progress' ) return parser.parse_args() args = parse_args() # setup logger logging.console.setLevel(logging.INFO) datetime_now = datetime.now().strftime("%Y%m%d-%H%M%S") log_file_path = os.path.join(settings.DATA_PATH, args.subj, f"freegrasping_{datetime_now}.log") logger = logging.LogFile(log_file_path, level=logging.INFO, filemode='w') # initialize devices and models if not args.debug: # load model model_path = os.path.join(settings.MODEL_PATH, args.subj, args.model_filename) input_kwargs = { 'state_trans_prob': args.state_trans_prob, 'state_change_threshold': args.state_change_threshold, 'momentum': args.momentum } control_model = model_loader(model_path, **input_kwargs) # build bci controller controller = Controller(0., control_model, reref_method=config_info['reref']) # connect neo receiver = NeuracleDataClient(n_channel=len(config_info['channel_labels']), samplerate=config_info['sample_rate'], host=config_info['host'], port=config_info['port'], buffer_len=config_info['buffer_length']) # connect to trigger box trigger = TriggerNeuracle(port=args.trigger_port) # connect to mechanical hand hand_device = FingerController(mode='step', init_params={'port': args.hand_port}) serial_port = args.arduino_port baud_rate = 9600 # Connect to Arduino ser = connect_to_arduino(serial_port, baud_rate) # setup window win = visual.Window( size=[1920, 1080], fullscr=True, screen=0, winType='pyglet', allowStencil=False, monitor='testMonitor', color=[1,1,1], colorSpace='rgb', backgroundImage='', backgroundFit='none', blendMode='avg', useFBO=True, units='height' ) fps = win.getActualFrameRate() update_interval = 0.1 # second # --- Initialize components for Routine "decision" --- feedback_bar = visual.Progress( win, name='feedback_bar', progress=0, pos=(0, -0.25), size=(0.5, 0.1), anchor='bottom-left', units='height', barColor='black', backColor=None, borderColor='black', colorSpace='rgb', lineWidth=4.0, opacity=1.0, ori=270.0, depth=0 ) text_message = visual.TextStim(win=win, name='text', text='您将在接下来的任务中自主控制气动手,\n进度条提示您当前时刻的抓握力度。\n希望气动手握紧请用力尝试握手,\n希望气动手松开请尝试放松。\n按空格键继续', font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=0.0) text_message.draw() win.flip() event.waitKeys() while True: keys = event.getKeys() if 'escape' in keys: break if not args.debug: data_from_buffer = receiver.get_trial_data(clear=False) decision = controller.decision(data_from_buffer, None) force = controller.real_feedback_model.probability[1] else: decision = random.randint(-1, 1) force = random.random() current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") logging.exp(f'Prob: {force} - current time: {current_time}') feedback_bar.progress = force feedback_bar.draw() win.flip() if decision != -1: # logging decision change current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") logging.exp(f'Decision: {decision:d} - current time: {current_time}') if not args.debug: trigger.send_trigger(int(decision)) hand_device.move(action=fingermodel_ids_inverse[decision]) send_toggle_command(ser) # if decision == 0: # only when state changes to rest, give a freeze time # core.wait(3) else: core.wait(0.1) # exit exp if not args.debug: receiver.close() logging.flush()