1
0

trigger_box.py 9.1 KB


  1. import socket
  2. import struct
  3. import serial
  4. from serial.tools.list_ports import comports
  5. class AttrDict(dict):
  6. def __init__(self, *args, **kwargs):
  7. super(AttrDict, self).__init__(*args, **kwargs)
  8. self.__dict__ = self
  9. class TriggerNeuracle:
  10. triggerbox = None
  11. dotSize = 50
  12. def __init__(self, **kwargs):
  13. super(TriggerNeuracle, self).__init__()
  14. self.kwargs = kwargs
  15. # initiate triggerbox
  16. self.triggerbox = TriggerBox()
  17. def send_trigger(self, data):
  18. # directly using serial port
  19. self.triggerbox.OutputEventData(data)
  20. class TriggerBox(object):
  21. """docstring for TriggerBox"""
  22. functionIDSensorParaGet = 1
  23. functionIDSensorParaSet = 2
  24. functionIDDeviceInfoGet = 3
  25. functionIDDeviceNameGet = 4
  26. functionIDSensorSampleGet = 5
  27. functionIDSensorInfoGet = 6
  28. functionIDOutputEventData = 225
  29. functionIDError = 131
  30. sensorTypeDigitalIN = 1
  31. sensorTypeLight = 2
  32. sensorTypeLineIN = 3
  33. sensorTypeMic = 4
  34. sensorTypeKey = 5
  35. sensorTypeTemperature = 6
  36. sensorTypeHumidity = 7
  37. sensorTypeAmbientlight = 8
  38. sensorTypeDebug = 9
  39. sensorTypeAll = 255
  40. deviceID = 1
  41. # TODO: get device ID
  42. # properties
  43. comportHandle = None
  44. deviceName = None
  45. deviceInfo = None
  46. sensorInfo = None
  47. tcpOutput = None
  48. def __init__(self, port=None, tcpPort=None):
  49. if tcpPort is not None:
  50. self.tcpOutput = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  51. self.tcpOutput.connect(('localhost', tcpPort))
  52. if port is None:
  53. plist = comports()
  54. if not plist:
  55. raise Exception('No available port')
  56. validPort = None
  57. for p in plist:
  58. port = p.device
  59. if 'cu.usbserial' in port or 'COM' in port:
  60. isValidDevice = TriggerBox.isValidDevice(port)
  61. if isValidDevice:
  62. validPort = port
  63. break
  64. if validPort is None:
  65. raise Exception('No available port')
  66. self.comportHandle = serial.Serial(port, 115200, timeout=0.05)
  67. self.comportHandle.flush()
  68. self.GetDeviceName()
  69. self.GetDeviceInfo()
  70. self.GetSensorInfo()
  71. @staticmethod
  72. def isValidDevice(portName):
  73. '''
  74. ValidateDevice
  75. '''
  76. handle = serial.Serial(portName, 115200, timeout=0.05)
  77. handle.flush()
  78. # send device message
  79. message = struct.pack('<2BH', *[TriggerBox.deviceID, 4, 0])
  80. handle.write(message)
  81. message = handle.read(size=4)
  82. handle.flush()
  83. if not message:
  84. return False
  85. return True
  86. def OutputEventData(self, eventData):
  87. # directly mark trigger with serial
  88. # eventData is an unsigned short
  89. assert isinstance(eventData, int)
  90. msg = struct.pack('<H', eventData)
  91. self.SendCommand(self.functionIDOutputEventData, msg)
  92. resp = self.ReadResponse(self.functionIDOutputEventData)
  93. if self.tcpOutput is not None:
  94. self.tcpOutput.send(resp)
  95. def SetEventData(self, sensorID, eventData, triggerToBeOut=1):
  96. assert isinstance(eventData, int)
  97. sensorPara = self.GetSensorPara(sensorID)
  98. sensorPara.TriggerToBeOut = triggerToBeOut
  99. sensorPara.EventData = eventData
  100. self.SetSensorPara(sensorID, sensorPara)
  101. def GetDeviceName(self):
  102. self.SendCommand(self.functionIDDeviceNameGet, 1)
  103. name = self.ReadResponse(self.functionIDDeviceNameGet)
  104. name = name.decode()
  105. self.deviceName = name
  106. return name
  107. def GetDeviceInfo(self):
  108. self.SendCommand(self.functionIDDeviceInfoGet, 1)
  109. info = self.ReadResponse(self.functionIDDeviceInfoGet)
  110. deviceInfo = AttrDict({
  111. 'HardwareVersion': info[0],
  112. 'FirmwareVersion': info[1],
  113. 'SensorSum': info[2],
  114. 'ID': struct.unpack('<I', info[4:])
  115. })
  116. self.deviceInfo = deviceInfo
  117. return deviceInfo
  118. def GetSensorInfo(self):
  119. switch = {
  120. self.sensorTypeDigitalIN: 'DigitalIN',
  121. self.sensorTypeLight: 'Light',
  122. self.sensorTypeLineIN: 'LineIN',
  123. self.sensorTypeMic: 'Mic',
  124. self.sensorTypeKey: 'Key',
  125. self.sensorTypeTemperature: 'Temperature',
  126. self.sensorTypeHumidity: 'Humidity',
  127. self.sensorTypeAmbientlight: 'Ambientlight',
  128. self.sensorTypeDebug: 'Debug'
  129. }
  130. self.SendCommand(self.functionIDSensorInfoGet)
  131. info = self.ReadResponse(self.functionIDSensorInfoGet)
  132. sensorInfo = []
  133. for i in range(0, len(info), 2):
  134. # print(info[i], info[i+1])
  135. sensor_type = info[i]
  136. try:
  137. sensorType = switch[sensor_type]
  138. except KeyError:
  139. sensorType = 'Undefined'
  140. # print('Undefined sensor type')
  141. sensorNum = info[i + 1]
  142. sensorInfo.append(AttrDict(Type=sensorType, Number=sensorNum))
  143. self.sensorInfo = sensorInfo
  144. return sensorInfo
  145. def GetSensorPara(self, sensorID):
  146. sensor = self.sensorInfo[sensorID]
  147. cmd = [self.SensorType(sensor.Type), sensor.Number]
  148. cmd = struct.pack('<2B', *cmd)
  149. self.SendCommand(self.functionIDSensorParaGet, cmd)
  150. para = self.ReadResponse(self.functionIDSensorParaGet)
  151. para = struct.unpack('<2B3H', para)
  152. sensorPara = AttrDict({
  153. 'Edge': para[0],
  154. 'OutputChannel': para[1],
  155. 'TriggerToBeOut': para[2],
  156. 'Threshold': para[3],
  157. 'EventData': para[4]
  158. })
  159. return sensorPara
  160. def SetSensorPara(self, sensorID, sensorPara):
  161. sensor = self.sensorInfo[sensorID]
  162. cmd = [self.SensorType(sensor.Type), sensor.Number] + [sensorPara[key]
  163. for key in sensorPara.keys()]
  164. cmd = struct.pack('<4B3H', *cmd)
  165. self.SendCommand(self.functionIDSensorParaSet, cmd)
  166. resp = self.ReadResponse(self.functionIDSensorParaSet)
  167. isSucceed = (resp[0] == self.SensorType(sensor.Type)) and (resp[1] == sensor.Number)
  168. return isSucceed
  169. def GetSensorSample(self, sensorID):
  170. sensor = self.sensorInfo[sensorID]
  171. cmd = [self.SensorType(sensor.Type), sensor.Number]
  172. self.SendCommand(self.functionIDSensorSampleGet, struct.pack('<2B', *cmd))
  173. result = self.ReadResponse(self.functionIDSensorSampleGet)
  174. if result[0] != self.SensorType(sensor.Type) or result[1] != sensor.Number:
  175. raise Exception('Get sensor sample error')
  176. adcResult = struct.unpack('<H', result[2:])[0]
  177. return adcResult
  178. def SensorType(self, typeString):
  179. switch = {
  180. 'DigitalIN': self.sensorTypeDigitalIN,
  181. 'Light': self.sensorTypeLight,
  182. 'LineIN': self.sensorTypeLineIN,
  183. 'Mic': self.sensorTypeMic,
  184. 'Key': self.sensorTypeKey,
  185. 'Temperature': self.sensorTypeTemperature,
  186. 'Humidity': self.sensorTypeHumidity,
  187. 'Ambientlight': self.sensorTypeAmbientlight,
  188. 'Debug': self.sensorTypeDebug
  189. }
  190. try:
  191. typeNum = switch[typeString]
  192. except KeyError:
  193. raise Exception('Undefined sensor type')
  194. return typeNum
  195. def SendCommand(self, functionID, command=None):
  196. if command is not None:
  197. # process command data structure
  198. if isinstance(command, int):
  199. command = struct.pack('<B', command)
  200. # make sure command finally becomes 'bytes'
  201. assert isinstance(command, bytes)
  202. payload = len(command)
  203. else:
  204. payload = 0
  205. value = (self.deviceID, functionID, payload)
  206. message = struct.pack('<2BH', *value)
  207. if command is not None:
  208. message += command
  209. self.comportHandle.write(message)
  210. def ReadResponse(self, functionID):
  211. errorCases = {
  212. 0: 'None',
  213. 1: 'FrameHeader',
  214. 2: 'FramePayload',
  215. 3: 'ChannelNotExist',
  216. 4: 'DeviceID',
  217. 5: 'FunctionID',
  218. 6: 'SensorType'
  219. }
  220. message = self.comportHandle.read(4)
  221. message = struct.unpack('<2BH', message)
  222. if message[0] != self.deviceID:
  223. raise Exception('Response error: request deviceID %d, \
  224. return deviceID %d', self.deviceID, message[0])
  225. if message[1] != functionID:
  226. if message[1] == self.functionIDError:
  227. errorType = self.comportHandle.read(1)
  228. try:
  229. errorMessage = errorCases[errorType]
  230. except KeyError:
  231. raise Exception('Undefined error type')
  232. raise Exception('Response error: ', errorMessage)
  233. else:
  234. raise Exception('Response error: request functionID %d, \
  235. return functionID %d', functionID, message[1])
  236. payload = message[2]
  237. DataBuf = self.comportHandle.read(payload)
  238. return DataBuf