123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import os
- import argparse
- from datetime import datetime
- import random
- from psychopy import visual, core, event, logging
- from device.data_client import NeuracleDataClient
- from device.trigger_box import TriggerNeuracle
- from device.fubo_pneumatic_finger import FingerController
- from device.arduino import send_toggle_command,connect_to_arduino
- from settings.config import settings
- from bci_core.online import Controller, model_loader
- config_info = settings.CONFIG_INFO
- fingermodel_ids_inverse = settings.FINGERMODEL_IDS_INVERSE
- # get train params
- def parse_args():
- parser = argparse.ArgumentParser(
- description='Hand gesture train'
- )
- parser.add_argument(
- '--subj',
- dest='subj',
- help='Subject name',
- default=None,
- type=str
- )
- parser.add_argument(
- '--hand-port',
- dest='hand_port',
- help='Peripheral serial port',
- type=str
- )
- parser.add_argument(
- '--trigger-port',
- dest='trigger_port',
- help='Triggerbox serial port',
- type=str
- )
- parser.add_argument(
- '--arduino-port',
- dest='arduino_port',
- help='Arduino serial port',
- type=str
- )
- parser.add_argument(
- '--state-change-threshold',
- '-scth',
- dest='state_change_threshold',
- help='Threshold for HMM state change',
- type=float
- )
- parser.add_argument(
- '--state-trans-prob',
- '-stp',
- dest='state_trans_prob',
- help='Transition probability for HMM state change',
- default=0.8,
- type=float
- )
- parser.add_argument(
- '--momentum',
- help='Probability update momentum',
- default=0.5,
- type=float
- )
- parser.add_argument(
- '--model-filename',
- dest='model_filename',
- help='Model file',
- default=None,
- type=str
- )
- parser.add_argument(
- '--debug',
- action='store_true',
- help='Store true to debug progress'
- )
- return parser.parse_args()
- args = parse_args()
- # setup logger
- logging.console.setLevel(logging.INFO)
- datetime_now = datetime.now().strftime("%Y%m%d-%H%M%S")
- log_file_path = os.path.join(settings.DATA_PATH, args.subj, f"freegrasping_{datetime_now}.log")
- logger = logging.LogFile(log_file_path, level=logging.INFO, filemode='w')
- # initialize devices and models
- if not args.debug:
- # load model
- model_path = os.path.join(settings.MODEL_PATH, args.subj, args.model_filename)
- input_kwargs = {
- 'state_trans_prob': args.state_trans_prob,
- 'state_change_threshold': args.state_change_threshold,
- 'momentum': args.momentum
- }
- control_model = model_loader(model_path, **input_kwargs)
- # build bci controller
- controller = Controller(0., control_model, reref_method=config_info['reref'])
- # connect neo
- receiver = NeuracleDataClient(n_channel=len(config_info['channel_labels']),
- samplerate=config_info['sample_rate'],
- host=config_info['host'],
- port=config_info['port'],
- buffer_len=config_info['buffer_length'])
- # connect to trigger box
- trigger = TriggerNeuracle(port=args.trigger_port)
- # connect to mechanical hand
- hand_device = FingerController(mode='step', init_params={'port': args.hand_port})
- serial_port = args.arduino_port
- baud_rate = 9600
- # Connect to Arduino
- ser = connect_to_arduino(serial_port, baud_rate)
- # setup window
- win = visual.Window(
- size=[1920, 1080], fullscr=True, screen=0,
- winType='pyglet', allowStencil=False,
- monitor='testMonitor', color=[1,1,1], colorSpace='rgb',
- backgroundImage='', backgroundFit='none',
- blendMode='avg', useFBO=True,
- units='height'
- )
- fps = win.getActualFrameRate()
- update_interval = 0.1 # second
- # --- Initialize components for Routine "decision" ---
- feedback_bar = visual.Progress(
- win, name='feedback_bar',
- progress=0,
- pos=(0, -0.25), size=(0.5, 0.1), anchor='bottom-left', units='height',
- barColor='black', backColor=None, borderColor='black', colorSpace='rgb',
- lineWidth=4.0, opacity=1.0, ori=270.0,
- depth=0
- )
- text_message = visual.TextStim(win=win, name='text',
- text='您将在接下来的任务中自主控制气动手,\n进度条提示您当前时刻的抓握力度。\n希望气动手握紧请用力尝试握手,\n希望气动手松开请尝试放松。\n按空格键继续',
- font='Open Sans',
- pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0,
- color='black', colorSpace='rgb', opacity=None,
- languageStyle='LTR',
- depth=0.0)
- text_message.draw()
- win.flip()
- event.waitKeys()
- while True:
- keys = event.getKeys()
- if 'escape' in keys:
- break
- if not args.debug:
- data_from_buffer = receiver.get_trial_data(clear=False)
- decision = controller.decision(data_from_buffer, None)
- force = controller.real_feedback_model.probability[1]
- else:
- decision = random.randint(-1, 1)
- force = random.random()
- current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
- logging.exp(f'Prob: {force} - current time: {current_time}')
- feedback_bar.progress = force
- feedback_bar.draw()
- win.flip()
- if decision != -1:
- # logging decision change
- current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
- logging.exp(f'Decision: {decision:d} - current time: {current_time}')
- if not args.debug:
- trigger.send_trigger(int(decision))
- hand_device.move(action=fingermodel_ids_inverse[decision])
- send_toggle_command(ser)
- # if decision == 0: # only when state changes to rest, give a freeze time
- # core.wait(3)
- else:
- core.wait(0.1)
- # exit exp
- if not args.debug:
- receiver.close()
- logging.flush()
|