1
0

trigger_box.py 9.0 KB


  1. import socket
  2. import struct
  3. import serial
  4. from serial.tools.list_ports import comports
  5. class AttrDict(dict):
  6. def __init__(self, *args, **kwargs):
  7. super(AttrDict, self).__init__(*args, **kwargs)
  8. self.__dict__ = self
  9. class TriggerNeuracle:
  10. def __init__(self, port=None, **kwargs):
  11. self.kwargs = kwargs
  12. # initiate triggerbox
  13. self.triggerbox = TriggerBox(port=port)
  14. def send_trigger(self, data):
  15. # directly using serial port
  16. self.triggerbox.OutputEventData(data)
  17. class TriggerBox(object):
  18. """docstring for TriggerBox"""
  19. functionIDSensorParaGet = 1
  20. functionIDSensorParaSet = 2
  21. functionIDDeviceInfoGet = 3
  22. functionIDDeviceNameGet = 4
  23. functionIDSensorSampleGet = 5
  24. functionIDSensorInfoGet = 6
  25. functionIDOutputEventData = 225
  26. functionIDError = 131
  27. sensorTypeDigitalIN = 1
  28. sensorTypeLight = 2
  29. sensorTypeLineIN = 3
  30. sensorTypeMic = 4
  31. sensorTypeKey = 5
  32. sensorTypeTemperature = 6
  33. sensorTypeHumidity = 7
  34. sensorTypeAmbientlight = 8
  35. sensorTypeDebug = 9
  36. sensorTypeAll = 255
  37. deviceID = 1
  38. # TODO: get device ID
  39. # properties
  40. comportHandle = None
  41. deviceName = None
  42. deviceInfo = None
  43. sensorInfo = None
  44. tcpOutput = None
  45. def __init__(self, port=None, tcpPort=None):
  46. if tcpPort is not None:
  47. self.tcpOutput = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  48. self.tcpOutput.connect(('localhost', tcpPort))
  49. if port is None:
  50. plist = comports()
  51. if not plist:
  52. raise Exception('No available port')
  53. validPort = None
  54. for p in plist:
  55. port = p.device
  56. if 'cu.usbserial' in port or 'COM' in port:
  57. isValidDevice = TriggerBox.isValidDevice(port)
  58. if isValidDevice:
  59. validPort = port
  60. break
  61. if validPort is None:
  62. raise Exception('No available port')
  63. self.comportHandle = serial.Serial(port, 115200, timeout=0.05)
  64. self.comportHandle.flush()
  65. self.GetDeviceName()
  66. self.GetDeviceInfo()
  67. self.GetSensorInfo()
  68. @staticmethod
  69. def isValidDevice(portName):
  70. '''
  71. ValidateDevice
  72. '''
  73. handle = serial.Serial(portName, 115200, timeout=0.05)
  74. handle.flush()
  75. # send device message
  76. message = struct.pack('<2BH', *[TriggerBox.deviceID, 4, 0])
  77. handle.write(message)
  78. message = handle.read(size=4)
  79. handle.flush()
  80. if not message:
  81. return False
  82. return True
  83. def OutputEventData(self, eventData):
  84. # directly mark trigger with serial
  85. # eventData is an unsigned short
  86. assert isinstance(eventData, int)
  87. msg = struct.pack('<H', eventData)
  88. self.SendCommand(self.functionIDOutputEventData, msg)
  89. resp = self.ReadResponse(self.functionIDOutputEventData)
  90. if self.tcpOutput is not None:
  91. self.tcpOutput.send(resp)
  92. def SetEventData(self, sensorID, eventData, triggerToBeOut=1):
  93. assert isinstance(eventData, int)
  94. sensorPara = self.GetSensorPara(sensorID)
  95. sensorPara.TriggerToBeOut = triggerToBeOut
  96. sensorPara.EventData = eventData
  97. self.SetSensorPara(sensorID, sensorPara)
  98. def GetDeviceName(self):
  99. self.SendCommand(self.functionIDDeviceNameGet, 1)
  100. name = self.ReadResponse(self.functionIDDeviceNameGet)
  101. name = name.decode()
  102. self.deviceName = name
  103. return name
  104. def GetDeviceInfo(self):
  105. self.SendCommand(self.functionIDDeviceInfoGet, 1)
  106. info = self.ReadResponse(self.functionIDDeviceInfoGet)
  107. deviceInfo = AttrDict({
  108. 'HardwareVersion': info[0],
  109. 'FirmwareVersion': info[1],
  110. 'SensorSum': info[2],
  111. 'ID': struct.unpack('<I', info[4:])
  112. })
  113. self.deviceInfo = deviceInfo
  114. return deviceInfo
  115. def GetSensorInfo(self):
  116. switch = {
  117. self.sensorTypeDigitalIN: 'DigitalIN',
  118. self.sensorTypeLight: 'Light',
  119. self.sensorTypeLineIN: 'LineIN',
  120. self.sensorTypeMic: 'Mic',
  121. self.sensorTypeKey: 'Key',
  122. self.sensorTypeTemperature: 'Temperature',
  123. self.sensorTypeHumidity: 'Humidity',
  124. self.sensorTypeAmbientlight: 'Ambientlight',
  125. self.sensorTypeDebug: 'Debug'
  126. }
  127. self.SendCommand(self.functionIDSensorInfoGet)
  128. info = self.ReadResponse(self.functionIDSensorInfoGet)
  129. sensorInfo = []
  130. for i in range(0, len(info), 2):
  131. # print(info[i], info[i+1])
  132. sensor_type = info[i]
  133. try:
  134. sensorType = switch[sensor_type]
  135. except KeyError:
  136. sensorType = 'Undefined'
  137. # print('Undefined sensor type')
  138. sensorNum = info[i + 1]
  139. sensorInfo.append(AttrDict(Type=sensorType, Number=sensorNum))
  140. self.sensorInfo = sensorInfo
  141. return sensorInfo
  142. def GetSensorPara(self, sensorID):
  143. sensor = self.sensorInfo[sensorID]
  144. cmd = [self.SensorType(sensor.Type), sensor.Number]
  145. cmd = struct.pack('<2B', *cmd)
  146. self.SendCommand(self.functionIDSensorParaGet, cmd)
  147. para = self.ReadResponse(self.functionIDSensorParaGet)
  148. para = struct.unpack('<2B3H', para)
  149. sensorPara = AttrDict({
  150. 'Edge': para[0],
  151. 'OutputChannel': para[1],
  152. 'TriggerToBeOut': para[2],
  153. 'Threshold': para[3],
  154. 'EventData': para[4]
  155. })
  156. return sensorPara
  157. def SetSensorPara(self, sensorID, sensorPara):
  158. sensor = self.sensorInfo[sensorID]
  159. cmd = [self.SensorType(sensor.Type), sensor.Number] + [sensorPara[key]
  160. for key in sensorPara.keys()]
  161. cmd = struct.pack('<4B3H', *cmd)
  162. self.SendCommand(self.functionIDSensorParaSet, cmd)
  163. resp = self.ReadResponse(self.functionIDSensorParaSet)
  164. isSucceed = (resp[0] == self.SensorType(sensor.Type)) and (resp[1] == sensor.Number)
  165. return isSucceed
  166. def GetSensorSample(self, sensorID):
  167. sensor = self.sensorInfo[sensorID]
  168. cmd = [self.SensorType(sensor.Type), sensor.Number]
  169. self.SendCommand(self.functionIDSensorSampleGet, struct.pack('<2B', *cmd))
  170. result = self.ReadResponse(self.functionIDSensorSampleGet)
  171. if result[0] != self.SensorType(sensor.Type) or result[1] != sensor.Number:
  172. raise Exception('Get sensor sample error')
  173. adcResult = struct.unpack('<H', result[2:])[0]
  174. return adcResult
  175. def SensorType(self, typeString):
  176. switch = {
  177. 'DigitalIN': self.sensorTypeDigitalIN,
  178. 'Light': self.sensorTypeLight,
  179. 'LineIN': self.sensorTypeLineIN,
  180. 'Mic': self.sensorTypeMic,
  181. 'Key': self.sensorTypeKey,
  182. 'Temperature': self.sensorTypeTemperature,
  183. 'Humidity': self.sensorTypeHumidity,
  184. 'Ambientlight': self.sensorTypeAmbientlight,
  185. 'Debug': self.sensorTypeDebug
  186. }
  187. try:
  188. typeNum = switch[typeString]
  189. except KeyError:
  190. raise Exception('Undefined sensor type')
  191. return typeNum
  192. def SendCommand(self, functionID, command=None):
  193. if command is not None:
  194. # process command data structure
  195. if isinstance(command, int):
  196. command = struct.pack('<B', command)
  197. # make sure command finally becomes 'bytes'
  198. assert isinstance(command, bytes)
  199. payload = len(command)
  200. else:
  201. payload = 0
  202. value = (self.deviceID, functionID, payload)
  203. message = struct.pack('<2BH', *value)
  204. if command is not None:
  205. message += command
  206. self.comportHandle.write(message)
  207. def ReadResponse(self, functionID):
  208. errorCases = {
  209. 0: 'None',
  210. 1: 'FrameHeader',
  211. 2: 'FramePayload',
  212. 3: 'ChannelNotExist',
  213. 4: 'DeviceID',
  214. 5: 'FunctionID',
  215. 6: 'SensorType'
  216. }
  217. message = self.comportHandle.read(4)
  218. message = struct.unpack('<2BH', message)
  219. if message[0] != self.deviceID:
  220. raise Exception('Response error: request deviceID %d, \
  221. return deviceID %d', self.deviceID, message[0])
  222. if message[1] != functionID:
  223. if message[1] == self.functionIDError:
  224. errorType = self.comportHandle.read(1)
  225. try:
  226. errorMessage = errorCases[errorType]
  227. except KeyError:
  228. raise Exception('Undefined error type')
  229. raise Exception('Response error: ', errorMessage)
  230. else:
  231. raise Exception('Response error: request functionID %d, \
  232. return functionID %d', functionID, message[1])
  233. payload = message[2]
  234. DataBuf = self.comportHandle.read(payload)
  235. return DataBuf