import socket import struct import serial from serial.tools.list_ports import comports class AttrDict(dict): def __init__(self, *args, **kwargs): super(AttrDict, self).__init__(*args, **kwargs) self.__dict__ = self class TriggerNeuracle: def __init__(self, port=None, **kwargs): self.kwargs = kwargs # initiate triggerbox self.triggerbox = TriggerBox(port=port) def send_trigger(self, data): # directly using serial port self.triggerbox.OutputEventData(data) class TriggerBox(object): """docstring for TriggerBox""" functionIDSensorParaGet = 1 functionIDSensorParaSet = 2 functionIDDeviceInfoGet = 3 functionIDDeviceNameGet = 4 functionIDSensorSampleGet = 5 functionIDSensorInfoGet = 6 functionIDOutputEventData = 225 functionIDError = 131 sensorTypeDigitalIN = 1 sensorTypeLight = 2 sensorTypeLineIN = 3 sensorTypeMic = 4 sensorTypeKey = 5 sensorTypeTemperature = 6 sensorTypeHumidity = 7 sensorTypeAmbientlight = 8 sensorTypeDebug = 9 sensorTypeAll = 255 deviceID = 1 # TODO: get device ID # properties comportHandle = None deviceName = None deviceInfo = None sensorInfo = None tcpOutput = None def __init__(self, port=None, tcpPort=None): if tcpPort is not None: self.tcpOutput = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcpOutput.connect(('localhost', tcpPort)) if port is None: plist = comports() if not plist: raise Exception('No available port') validPort = None for p in plist: port = p.device if 'cu.usbserial' in port or 'COM' in port: isValidDevice = TriggerBox.isValidDevice(port) if isValidDevice: validPort = port break if validPort is None: raise Exception('No available port') self.comportHandle = serial.Serial(port, 115200, timeout=0.05) self.comportHandle.flush() self.GetDeviceName() self.GetDeviceInfo() self.GetSensorInfo() @staticmethod def isValidDevice(portName): ''' ValidateDevice ''' handle = serial.Serial(portName, 115200, timeout=0.05) handle.flush() # send device message message = struct.pack('<2BH', *[TriggerBox.deviceID, 4, 0]) handle.write(message) message = handle.read(size=4) handle.flush() if not message: return False return True def OutputEventData(self, eventData): # directly mark trigger with serial # eventData is an unsigned short assert isinstance(eventData, int) msg = struct.pack('