#!/usr/bin/env python # -*- coding: utf-8 -*- """ This experiment was created using PsychoPy3 Experiment Builder (v2023.2.3), on Tue Dec 12 13:08:19 2023 If you publish work using this script the most relevant publication is: Peirce J, Gray JR, Simpson S, MacAskill M, Höchenberger R, Sogo H, Kastman E, Lindeløv JK. (2019) PsychoPy2: Experiments in behavior made easy Behav Res 51: 195. https://doi.org/10.3758/s13428-018-01193-y """ # --- Import packages --- from psychopy import locale_setup from psychopy import prefs from psychopy import plugins plugins.activatePlugins() prefs.hardware['audioLib'] = 'ptb' prefs.hardware['audioLatencyMode'] = '3' from psychopy import sound, gui, visual, core, data, event, logging, clock, colors, layout from psychopy.tools import environmenttools from psychopy.constants import (NOT_STARTED, STARTED, PLAYING, PAUSED, STOPPED, FINISHED, PRESSED, RELEASED, FOREVER, priority) import numpy as np # whole numpy lib is available, prepend 'np.' from numpy import (sin, cos, tan, log, log10, pi, average, sqrt, std, deg2rad, rad2deg, linspace, asarray) from numpy.random import random, randint, normal, shuffle, choice as randchoice import os # handy system and path functions import sys # to get file system encoding from psychopy.hardware import keyboard # Run 'Before Experiment' code from config import os import datetime from time import sleep import argparse from device.data_client import NeuracleDataClient from device.trigger_box import TriggerNeuracle from device.fubo_pneumatic_finger import FuboPneumaticFingerClient from settings.config import settings from bci_core.online import Controller, model_loader from settings.config import settings config_info = settings.CONFIG_INFO # get train params def parse_args(): parser = argparse.ArgumentParser( description='Hand gesture train' ) parser.add_argument( '--subj', dest='subj', help='Subject name', default=None, type=str ) parser.add_argument( '--n-trials', dest='n_trials', help='Trial number', type=int, ) parser.add_argument( '--hand-feedback', dest='hand_feedback', action='store_true', ) parser.add_argument( '--com', dest='com', help='Peripheral serial port', type=str ) parser.add_argument( '--finger-model', '-fm', dest='finger_model', help='Gesture to train', type=str ) parser.add_argument( '--virtual-feedback-rate', '-vfr', dest='virtual_feedback_rate', help='Virtual feedback rate', type=float ) parser.add_argument( '--difficulty', help='Task difficultys', type=str ) parser.add_argument( '--model-path', dest='model_path', help='Path to model file', default=None, type=str ) return parser.parse_args() args = parse_args() # connect neo receiver = NeuracleDataClient(n_channel=len(config_info['channel_labels']), samplerate=config_info['sample_rate'], host=config_info['host'], port=config_info['port'], buffer_len=config_info['buffer_length']) # connect to trigger box trigger = TriggerNeuracle() if args.hand_feedback: # connect to mechanical hand hand_device = FuboPneumaticFingerClient({'port': args.com}) # build bci controller control_model = model_loader(args.model_path) controller = Controller(args.virtual_feedback_rate, control_model) # Run 'Before Experiment' code from decision cnt_threshold_table = { 'easy': 3, 'mid': 4, 'hard': 5 } cnt_threshold = cnt_threshold_table[args.difficulty] correct_cnt = 0 # --- Setup global variables (available in all functions) --- # Ensure that relative paths start from the same directory as this script _thisDir = os.path.dirname(os.path.abspath(__file__)) # Store info about the experiment session psychopyVersion = '2023.2.3' expName = 'train' # from the Builder filename that created this script expInfo = { 'participant': f"{randint(0, 999999):06.0f}", 'session': '001', 'date': data.getDateStr(), # add a simple timestamp 'expName': expName, 'psychopyVersion': psychopyVersion, } def showExpInfoDlg(expInfo): """ Show participant info dialog. Parameters ========== expInfo : dict Information about this experiment, created by the `setupExpInfo` function. Returns ========== dict Information about this experiment. """ # temporarily remove keys which the dialog doesn't need to show poppedKeys = { 'date': expInfo.pop('date', data.getDateStr()), 'expName': expInfo.pop('expName', expName), 'psychopyVersion': expInfo.pop('psychopyVersion', psychopyVersion), } # show participant info dialog dlg = gui.DlgFromDict(dictionary=expInfo, sortKeys=False, title=expName) if dlg.OK == False: core.quit() # user pressed cancel # restore hidden keys expInfo.update(poppedKeys) # return expInfo return expInfo def setupData(expInfo, dataDir=None): """ Make an ExperimentHandler to handle trials and saving. Parameters ========== expInfo : dict Information about this experiment, created by the `setupExpInfo` function. dataDir : Path, str or None Folder to save the data to, leave as None to create a folder in the current directory. Returns ========== psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. """ # data file name stem = absolute path + name; later add .psyexp, .csv, .log, etc if dataDir is None: dataDir = _thisDir filename = u'data/%s_%s_%s' % (expInfo['participant'], expName, expInfo['date']) # make sure filename is relative to dataDir if os.path.isabs(filename): dataDir = os.path.commonprefix([dataDir, filename]) filename = os.path.relpath(filename, dataDir) # an ExperimentHandler isn't essential but helps with data saving thisExp = data.ExperimentHandler( name=expName, version='', extraInfo=expInfo, runtimeInfo=None, originPath='/Users/dingkunliu/Projects/MI-BCI-Proj/kraken/backend/general_grasp_training.py', savePickle=True, saveWideText=True, dataFileName=dataDir + os.sep + filename, sortColumns='time' ) thisExp.setPriority('thisRow.t', priority.CRITICAL) thisExp.setPriority('expName', priority.LOW) # return experiment handler return thisExp def setupLogging(filename): """ Setup a log file and tell it what level to log at. Parameters ========== filename : str or pathlib.Path Filename to save log file and data files as, doesn't need an extension. Returns ========== psychopy.logging.LogFile Text stream to receive inputs from the logging system. """ # this outputs to the screen, not a file logging.console.setLevel(logging.DEBUG) # save a log file for detail verbose info logFile = logging.LogFile(filename+'.log', level=logging.DEBUG) return logFile def setupWindow(expInfo=None, win=None): """ Setup the Window Parameters ========== expInfo : dict Information about this experiment, created by the `setupExpInfo` function. win : psychopy.visual.Window Window to setup - leave as None to create a new window. Returns ========== psychopy.visual.Window Window in which to run this experiment. """ if win is None: # if not given a window to setup, make one win = visual.Window( size=[1440, 900], fullscr=True, screen=0, winType='pyglet', allowStencil=False, monitor='testMonitor', color=[1,1,1], colorSpace='rgb', backgroundImage='', backgroundFit='none', blendMode='avg', useFBO=True, units='height' ) if expInfo is not None: # store frame rate of monitor if we can measure it expInfo['frameRate'] = win.getActualFrameRate() else: # if we have a window, just set the attributes which are safe to set win.color = [1,1,1] win.colorSpace = 'rgb' win.backgroundImage = '' win.backgroundFit = 'none' win.units = 'height' win.mouseVisible = False win.hideMessage() return win def setupInputs(expInfo, thisExp, win): """ Setup whatever inputs are available (mouse, keyboard, eyetracker, etc.) Parameters ========== expInfo : dict Information about this experiment, created by the `setupExpInfo` function. thisExp : psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. win : psychopy.visual.Window Window in which to run this experiment. Returns ========== dict Dictionary of input devices by name. """ # --- Setup input devices --- inputs = {} ioConfig = {} ioSession = ioServer = eyetracker = None # create a default keyboard (e.g. to check for escape) defaultKeyboard = keyboard.Keyboard(backend='ptb') # return inputs dict return { 'ioServer': ioServer, 'defaultKeyboard': defaultKeyboard, 'eyetracker': eyetracker, } def pauseExperiment(thisExp, inputs=None, win=None, timers=[], playbackComponents=[]): """ Pause this experiment, preventing the flow from advancing to the next routine until resumed. Parameters ========== thisExp : psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. inputs : dict Dictionary of input devices by name. win : psychopy.visual.Window Window for this experiment. timers : list, tuple List of timers to reset once pausing is finished. playbackComponents : list, tuple List of any components with a `pause` method which need to be paused. """ # if we are not paused, do nothing if thisExp.status != PAUSED: return # pause any playback components for comp in playbackComponents: comp.pause() # prevent components from auto-drawing win.stashAutoDraw() # run a while loop while we wait to unpause while thisExp.status == PAUSED: # make sure we have a keyboard if inputs is None: inputs = { 'defaultKeyboard': keyboard.Keyboard(backend='PsychToolbox') } # check for quit (typically the Esc key) if inputs['defaultKeyboard'].getKeys(keyList=['escape']): endExperiment(thisExp, win=win, inputs=inputs) # flip the screen win.flip() # if stop was requested while paused, quit if thisExp.status == FINISHED: endExperiment(thisExp, inputs=inputs, win=win) # resume any playback components for comp in playbackComponents: comp.play() # restore auto-drawn components win.retrieveAutoDraw() # reset any timers for timer in timers: timer.reset() def run(expInfo, thisExp, win, inputs, globalClock=None, thisSession=None): """ Run the experiment flow. Parameters ========== expInfo : dict Information about this experiment, created by the `setupExpInfo` function. thisExp : psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. psychopy.visual.Window Window in which to run this experiment. inputs : dict Dictionary of input devices by name. globalClock : psychopy.core.clock.Clock or None Clock to get global time from - supply None to make a new one. thisSession : psychopy.session.Session or None Handle of the Session object this experiment is being run from, if any. """ # mark experiment as started thisExp.status = STARTED # make sure variables created by exec are available globally exec = environmenttools.setExecEnvironment(globals()) # get device handles from dict of input devices ioServer = inputs['ioServer'] defaultKeyboard = inputs['defaultKeyboard'] eyetracker = inputs['eyetracker'] # make sure we're running in the directory for this experiment os.chdir(_thisDir) # get filename from ExperimentHandler for convenience filename = thisExp.dataFileName frameTolerance = 0.001 # how close to onset before 'same' frame endExpNow = False # flag for 'escape' or other condition => quit the exp # get frame duration from frame rate in expInfo if 'frameRate' in expInfo and expInfo['frameRate'] is not None: frameDur = 1.0 / round(expInfo['frameRate']) else: frameDur = 1.0 / 60.0 # could not measure, so guess # Start Code - component code to be run after the window creation # --- Initialize components for Routine "before_mi" --- train_position = visual.TextStim(win=win, name='train_position', text='训练部位:右手', font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=0.0); instruction = visual.TextStim(win=win, name='instruction', text='准备进行一般抓握训练,\n按空格键继续', font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=-1.0); key_resp = keyboard.Keyboard() # --- Initialize components for Routine "mi_prepare" --- text = visual.TextStim(win=win, name='text', text='请准备', font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=-1.0); # --- Initialize components for Routine "mi_begin" --- img_right = visual.ImageStim( win=win, name='img_right', image='static/images/hand_move.png', mask=None, anchor='center', ori=0.0, pos=(0, 0), size=None, color=[1,1,1], colorSpace='rgb', opacity=None, flipHoriz=False, flipVert=False, texRes=128.0, interpolate=True, depth=0.0) # --- Initialize components for Routine "decision" --- # --- Initialize components for Routine "mi_feedback" --- feedback = visual.TextStim(win=win, name='feedback', text=None, font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=0.0); # --- Initialize components for Routine "mi_feedback_2" --- feedback_2 = visual.TextStim(win=win, name='feedback_2', text=None, font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='white', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=0.0); # --- Initialize components for Routine "mi_rest" --- img_rest = visual.ImageStim( win=win, name='img_rest', image='static/images/rest.png', mask=None, anchor='center', ori=0.0, pos=(0, 0), size=None, color=[1,1,1], colorSpace='rgb', opacity=None, flipHoriz=False, flipVert=False, texRes=128.0, interpolate=True, depth=0.0) # --- Initialize components for Routine "end" --- mi_end = visual.TextStim(win=win, name='mi_end', text='结束实验', font='Open Sans', pos=(0, 0), height=0.05, wrapWidth=None, ori=0.0, color='black', colorSpace='rgb', opacity=None, languageStyle='LTR', depth=0.0); # create some handy timers if globalClock is None: globalClock = core.Clock() # to track the time since experiment started if ioServer is not None: ioServer.syncClock(globalClock) logging.setDefaultClock(globalClock) routineTimer = core.Clock() # to track time remaining of each (possibly non-slip) routine win.flip() # flip window to reset last flip timer # store the exact time the global clock started expInfo['expStart'] = data.getDateStr(format='%Y-%m-%d %Hh%M.%S.%f %z', fractionalSecondDigits=6) # --- Prepare to start Routine "before_mi" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('before_mi.started', globalClock.getTime()) key_resp.keys = [] key_resp.rt = [] _key_resp_allKeys = [] # keep track of which components have finished before_miComponents = [train_position, instruction, key_resp] for thisComponent in before_miComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "before_mi" --- routineForceEnded = not continueRoutine while continueRoutine: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *train_position* updates # if train_position is starting this frame... if train_position.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later train_position.frameNStart = frameN # exact frame index train_position.tStart = t # local t and not account for scr refresh train_position.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(train_position, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'train_position.started') # update status train_position.status = STARTED train_position.setAutoDraw(True) # if train_position is active this frame... if train_position.status == STARTED: # update params pass # if train_position is stopping this frame... if train_position.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > train_position.tStartRefresh + 2-frameTolerance: # keep track of stop time/frame for later train_position.tStop = t # not accounting for scr refresh train_position.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'train_position.stopped') # update status train_position.status = FINISHED train_position.setAutoDraw(False) # *instruction* updates # if instruction is starting this frame... if instruction.status == NOT_STARTED and tThisFlip >= 2-frameTolerance: # keep track of start time/frame for later instruction.frameNStart = frameN # exact frame index instruction.tStart = t # local t and not account for scr refresh instruction.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(instruction, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'instruction.started') # update status instruction.status = STARTED instruction.setAutoDraw(True) # if instruction is active this frame... if instruction.status == STARTED: # update params pass # *key_resp* updates waitOnFlip = False # if key_resp is starting this frame... if key_resp.status == NOT_STARTED and tThisFlip >= 2-frameTolerance: # keep track of start time/frame for later key_resp.frameNStart = frameN # exact frame index key_resp.tStart = t # local t and not account for scr refresh key_resp.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(key_resp, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'key_resp.started') # update status key_resp.status = STARTED # keyboard checking is just starting waitOnFlip = True win.callOnFlip(key_resp.clock.reset) # t=0 on next screen flip win.callOnFlip(key_resp.clearEvents, eventType='keyboard') # clear events on next screen flip if key_resp.status == STARTED and not waitOnFlip: theseKeys = key_resp.getKeys(keyList=['space'], ignoreKeys=["escape"], waitRelease=False) _key_resp_allKeys.extend(theseKeys) if len(_key_resp_allKeys): key_resp.keys = _key_resp_allKeys[-1].name # just the last key pressed key_resp.rt = _key_resp_allKeys[-1].rt key_resp.duration = _key_resp_allKeys[-1].duration # a response ends the routine continueRoutine = False # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in before_miComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "before_mi" --- for thisComponent in before_miComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('before_mi.stopped', globalClock.getTime()) # check responses if key_resp.keys in ['', [], None]: # No response was made key_resp.keys = None thisExp.addData('key_resp.keys',key_resp.keys) if key_resp.keys != None: # we had a response thisExp.addData('key_resp.rt', key_resp.rt) thisExp.addData('key_resp.duration', key_resp.duration) thisExp.nextEntry() # the Routine "before_mi" was not non-slip safe, so reset the non-slip timer routineTimer.reset() # set up handler to look after randomisation of conditions etc trials = data.TrialHandler(nReps=args.n_trials, method='sequential', extraInfo=expInfo, originPath=-1, trialList=[None], seed=None, name='trials') thisExp.addLoop(trials) # add the loop to the experiment thisTrial = trials.trialList[0] # so we can initialise stimuli with some values # abbreviate parameter names if possible (e.g. rgb = thisTrial.rgb) if thisTrial != None: for paramName in thisTrial: globals()[paramName] = thisTrial[paramName] for thisTrial in trials: currentLoop = trials thisExp.timestampOnFlip(win, 'thisRow.t') # pause experiment here if requested if thisExp.status == PAUSED: pauseExperiment( thisExp=thisExp, inputs=inputs, win=win, timers=[routineTimer], playbackComponents=[] ) # abbreviate parameter names if possible (e.g. rgb = thisTrial.rgb) if thisTrial != None: for paramName in thisTrial: globals()[paramName] = thisTrial[paramName] # --- Prepare to start Routine "mi_prepare" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('mi_prepare.started', globalClock.getTime()) # Run 'Begin Routine' code from initialize_buffer decision_buffer = [] # keep track of which components have finished mi_prepareComponents = [text] for thisComponent in mi_prepareComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "mi_prepare" --- routineForceEnded = not continueRoutine while continueRoutine and routineTimer.getTime() < 1.5: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *text* updates # if text is starting this frame... if text.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later text.frameNStart = frameN # exact frame index text.tStart = t # local t and not account for scr refresh text.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(text, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'text.started') # update status text.status = STARTED text.setAutoDraw(True) # if text is active this frame... if text.status == STARTED: # update params pass # if text is stopping this frame... if text.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > text.tStartRefresh + 1.5-frameTolerance: # keep track of stop time/frame for later text.tStop = t # not accounting for scr refresh text.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'text.stopped') # update status text.status = FINISHED text.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in mi_prepareComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "mi_prepare" --- for thisComponent in mi_prepareComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('mi_prepare.stopped', globalClock.getTime()) # using non-slip timing so subtract the expected duration of this Routine (unless ended on request) if routineForceEnded: routineTimer.reset() else: routineTimer.addTime(-1.500000) # set up handler to look after randomisation of conditions etc classification = data.TrialHandler(nReps=5.0, method='sequential', extraInfo=expInfo, originPath=-1, trialList=[None], seed=None, name='classification') thisExp.addLoop(classification) # add the loop to the experiment thisClassification = classification.trialList[0] # so we can initialise stimuli with some values # abbreviate parameter names if possible (e.g. rgb = thisClassification.rgb) if thisClassification != None: for paramName in thisClassification: globals()[paramName] = thisClassification[paramName] for thisClassification in classification: currentLoop = classification thisExp.timestampOnFlip(win, 'thisRow.t') # pause experiment here if requested if thisExp.status == PAUSED: pauseExperiment( thisExp=thisExp, inputs=inputs, win=win, timers=[routineTimer], playbackComponents=[] ) # abbreviate parameter names if possible (e.g. rgb = thisClassification.rgb) if thisClassification != None: for paramName in thisClassification: globals()[paramName] = thisClassification[paramName] # --- Prepare to start Routine "mi_begin" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('mi_begin.started', globalClock.getTime()) # Run 'Begin Routine' code from algo # send trigger current_true_label = settings.FINGERMODEL_IDS[args.finger_model] win.callOnFlip(trigger.send_trigger, current_true_label) # keep track of which components have finished mi_beginComponents = [img_right] for thisComponent in mi_beginComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "mi_begin" --- routineForceEnded = not continueRoutine while continueRoutine and routineTimer.getTime() < 1.0: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *img_right* updates # if img_right is starting this frame... if img_right.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later img_right.frameNStart = frameN # exact frame index img_right.tStart = t # local t and not account for scr refresh img_right.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(img_right, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'img_right.started') # update status img_right.status = STARTED img_right.setAutoDraw(True) # if img_right is active this frame... if img_right.status == STARTED: # update params pass # if img_right is stopping this frame... if img_right.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > img_right.tStartRefresh + 1-frameTolerance: # keep track of stop time/frame for later img_right.tStop = t # not accounting for scr refresh img_right.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'img_right.stopped') # update status img_right.status = FINISHED img_right.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in mi_beginComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "mi_begin" --- for thisComponent in mi_beginComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('mi_begin.stopped', globalClock.getTime()) # Run 'End Routine' code from algo data_from_buffer = receiver.get_trial_data(clear=False) decision = controller.step_decision(data_from_buffer, current_true_label) decision_buffer.append(decision) # write decision to data logging.exp('decision: {}'.format(decision)) # using non-slip timing so subtract the expected duration of this Routine (unless ended on request) if routineForceEnded: routineTimer.reset() else: routineTimer.addTime(-1.000000) # completed 5.0 repeats of 'classification' # --- Prepare to start Routine "decision" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('decision.started', globalClock.getTime()) # Run 'Begin Routine' code from decision cnt = 0 for d in decision_buffer: if d == current_true_label: cnt += 1 success = False feedback_time = 2 global correct_cnt if cnt >= cnt_threshold: success = True correct_cnt += 1 # count all correct trials if args.hand_feedback: feedback_time = 10 # keep track of which components have finished decisionComponents = [] for thisComponent in decisionComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "decision" --- routineForceEnded = not continueRoutine while continueRoutine: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in decisionComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "decision" --- for thisComponent in decisionComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('decision.stopped', globalClock.getTime()) # the Routine "decision" was not non-slip safe, so reset the non-slip timer routineTimer.reset() # --- Prepare to start Routine "mi_feedback" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('mi_feedback.started', globalClock.getTime()) # Run 'Begin Routine' code from code if success: feedback.text = '恭喜!' if args.hand_feedback: hand_device.start(args.finger_model) else: feedback.text = '继续努力!' # keep track of which components have finished mi_feedbackComponents = [feedback] for thisComponent in mi_feedbackComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "mi_feedback" --- routineForceEnded = not continueRoutine while continueRoutine: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *feedback* updates # if feedback is starting this frame... if feedback.status == NOT_STARTED and tThisFlip >= 0-frameTolerance: # keep track of start time/frame for later feedback.frameNStart = frameN # exact frame index feedback.tStart = t # local t and not account for scr refresh feedback.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(feedback, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'feedback.started') # update status feedback.status = STARTED feedback.setAutoDraw(True) # if feedback is active this frame... if feedback.status == STARTED: # update params pass # if feedback is stopping this frame... if feedback.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > feedback.tStartRefresh + feedback_time / 2-frameTolerance: # keep track of stop time/frame for later feedback.tStop = t # not accounting for scr refresh feedback.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'feedback.stopped') # update status feedback.status = FINISHED feedback.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in mi_feedbackComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "mi_feedback" --- for thisComponent in mi_feedbackComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('mi_feedback.stopped', globalClock.getTime()) # the Routine "mi_feedback" was not non-slip safe, so reset the non-slip timer routineTimer.reset() # --- Prepare to start Routine "mi_feedback_2" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('mi_feedback_2.started', globalClock.getTime()) # Run 'Begin Routine' code from code_2 if success: feedback_2.text = '恭喜!' if args.hand_feedback: hand_device.extend() else: feedback_2.text = '继续努力!' # keep track of which components have finished mi_feedback_2Components = [feedback_2] for thisComponent in mi_feedback_2Components: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "mi_feedback_2" --- routineForceEnded = not continueRoutine while continueRoutine: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *feedback_2* updates # if feedback_2 is starting this frame... if feedback_2.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later feedback_2.frameNStart = frameN # exact frame index feedback_2.tStart = t # local t and not account for scr refresh feedback_2.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(feedback_2, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'feedback_2.started') # update status feedback_2.status = STARTED feedback_2.setAutoDraw(True) # if feedback_2 is active this frame... if feedback_2.status == STARTED: # update params pass # if feedback_2 is stopping this frame... if feedback_2.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > feedback_2.tStartRefresh + feedback_time / 2-frameTolerance: # keep track of stop time/frame for later feedback_2.tStop = t # not accounting for scr refresh feedback_2.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'feedback_2.stopped') # update status feedback_2.status = FINISHED feedback_2.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in mi_feedback_2Components: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "mi_feedback_2" --- for thisComponent in mi_feedback_2Components: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('mi_feedback_2.stopped', globalClock.getTime()) # the Routine "mi_feedback_2" was not non-slip safe, so reset the non-slip timer routineTimer.reset() # --- Prepare to start Routine "mi_rest" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('mi_rest.started', globalClock.getTime()) # Run 'Begin Routine' code from trigger_rest # send trigger win.callOnFlip(trigger.send_trigger, 0) # keep track of which components have finished mi_restComponents = [img_rest] for thisComponent in mi_restComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "mi_rest" --- routineForceEnded = not continueRoutine while continueRoutine and routineTimer.getTime() < 5.0: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *img_rest* updates # if img_rest is starting this frame... if img_rest.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later img_rest.frameNStart = frameN # exact frame index img_rest.tStart = t # local t and not account for scr refresh img_rest.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(img_rest, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'img_rest.started') # update status img_rest.status = STARTED img_rest.setAutoDraw(True) # if img_rest is active this frame... if img_rest.status == STARTED: # update params pass # if img_rest is stopping this frame... if img_rest.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > img_rest.tStartRefresh + 5-frameTolerance: # keep track of stop time/frame for later img_rest.tStop = t # not accounting for scr refresh img_rest.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'img_rest.stopped') # update status img_rest.status = FINISHED img_rest.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in mi_restComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "mi_rest" --- for thisComponent in mi_restComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('mi_rest.stopped', globalClock.getTime()) # using non-slip timing so subtract the expected duration of this Routine (unless ended on request) if routineForceEnded: routineTimer.reset() else: routineTimer.addTime(-5.000000) thisExp.nextEntry() if thisSession is not None: # if running in a Session with a Liaison client, send data up to now thisSession.sendExperimentData() # completed args.n_trials repeats of 'trials' # --- Prepare to start Routine "end" --- continueRoutine = True # update component parameters for each repeat thisExp.addData('end.started', globalClock.getTime()) # Run 'Begin Routine' code from score mi_end.text = f"实验结束,\n得分:{int(correct_cnt / args.n_trials * 100)}" # keep track of which components have finished endComponents = [mi_end] for thisComponent in endComponents: thisComponent.tStart = None thisComponent.tStop = None thisComponent.tStartRefresh = None thisComponent.tStopRefresh = None if hasattr(thisComponent, 'status'): thisComponent.status = NOT_STARTED # reset timers t = 0 _timeToFirstFrame = win.getFutureFlipTime(clock="now") frameN = -1 # --- Run Routine "end" --- routineForceEnded = not continueRoutine while continueRoutine and routineTimer.getTime() < 5.0: # get current time t = routineTimer.getTime() tThisFlip = win.getFutureFlipTime(clock=routineTimer) tThisFlipGlobal = win.getFutureFlipTime(clock=None) frameN = frameN + 1 # number of completed frames (so 0 is the first frame) # update/draw components on each frame # *mi_end* updates # if mi_end is starting this frame... if mi_end.status == NOT_STARTED and tThisFlip >= 0.0-frameTolerance: # keep track of start time/frame for later mi_end.frameNStart = frameN # exact frame index mi_end.tStart = t # local t and not account for scr refresh mi_end.tStartRefresh = tThisFlipGlobal # on global time win.timeOnFlip(mi_end, 'tStartRefresh') # time at next scr refresh # add timestamp to datafile thisExp.timestampOnFlip(win, 'mi_end.started') # update status mi_end.status = STARTED mi_end.setAutoDraw(True) # if mi_end is active this frame... if mi_end.status == STARTED: # update params pass # if mi_end is stopping this frame... if mi_end.status == STARTED: # is it time to stop? (based on global clock, using actual start) if tThisFlipGlobal > mi_end.tStartRefresh + 5-frameTolerance: # keep track of stop time/frame for later mi_end.tStop = t # not accounting for scr refresh mi_end.frameNStop = frameN # exact frame index # add timestamp to datafile thisExp.timestampOnFlip(win, 'mi_end.stopped') # update status mi_end.status = FINISHED mi_end.setAutoDraw(False) # check for quit (typically the Esc key) if defaultKeyboard.getKeys(keyList=["escape"]): thisExp.status = FINISHED if thisExp.status == FINISHED or endExpNow: endExperiment(thisExp, inputs=inputs, win=win) return # check if all components have finished if not continueRoutine: # a component has requested a forced-end of Routine routineForceEnded = True break continueRoutine = False # will revert to True if at least one component still running for thisComponent in endComponents: if hasattr(thisComponent, "status") and thisComponent.status != FINISHED: continueRoutine = True break # at least one component has not yet finished # refresh the screen if continueRoutine: # don't flip if this routine is over or we'll get a blank screen win.flip() # --- Ending Routine "end" --- for thisComponent in endComponents: if hasattr(thisComponent, "setAutoDraw"): thisComponent.setAutoDraw(False) thisExp.addData('end.stopped', globalClock.getTime()) # using non-slip timing so subtract the expected duration of this Routine (unless ended on request) if routineForceEnded: routineTimer.reset() else: routineTimer.addTime(-5.000000) # Run 'End Experiment' code from config receiver.close() # mark experiment as finished endExperiment(thisExp, win=win, inputs=inputs) def saveData(thisExp): """ Save data from this experiment Parameters ========== thisExp : psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. """ filename = thisExp.dataFileName # these shouldn't be strictly necessary (should auto-save) thisExp.saveAsWideText(filename + '.csv', delim='auto') thisExp.saveAsPickle(filename) def endExperiment(thisExp, inputs=None, win=None): """ End this experiment, performing final shut down operations. This function does NOT close the window or end the Python process - use `quit` for this. Parameters ========== thisExp : psychopy.data.ExperimentHandler Handler object for this experiment, contains the data to save and information about where to save it to. inputs : dict Dictionary of input devices by name. win : psychopy.visual.Window Window for this experiment. """ if win is not None: # remove autodraw from all current components win.clearAutoDraw() # Flip one final time so any remaining win.callOnFlip() # and win.timeOnFlip() tasks get executed win.flip() # mark experiment handler as finished thisExp.status = FINISHED # shut down eyetracker, if there is one if inputs is not None: if 'eyetracker' in inputs and inputs['eyetracker'] is not None: inputs['eyetracker'].setConnectionState(False) logging.flush() def quit(thisExp, win=None, inputs=None, thisSession=None): """ Fully quit, closing the window and ending the Python process. Parameters ========== win : psychopy.visual.Window Window to close. inputs : dict Dictionary of input devices by name. thisSession : psychopy.session.Session or None Handle of the Session object this experiment is being run from, if any. """ thisExp.abort() # or data files will save again on exit # make sure everything is closed down if win is not None: # Flip one final time so any remaining win.callOnFlip() # and win.timeOnFlip() tasks get executed before quitting win.flip() win.close() if inputs is not None: if 'eyetracker' in inputs and inputs['eyetracker'] is not None: inputs['eyetracker'].setConnectionState(False) logging.flush() if thisSession is not None: thisSession.stop() # terminate Python process core.quit() # if running this experiment as a script... if __name__ == '__main__': # call all functions in order expInfo = showExpInfoDlg(expInfo=expInfo) thisExp = setupData(expInfo=expInfo) logFile = setupLogging(filename=thisExp.dataFileName) win = setupWindow(expInfo=expInfo) inputs = setupInputs(expInfo=expInfo, thisExp=thisExp, win=win) run( expInfo=expInfo, thisExp=thisExp, win=win, inputs=inputs ) saveData(thisExp=thisExp) quit(thisExp=thisExp, win=win, inputs=inputs)