trigger_box.py 8.0 KB


  1. import socket
  2. import struct
  3. import serial
  4. class AttrDict(dict):
  5. def __init__(self, *args, **kwargs):
  6. super(AttrDict, self).__init__(*args, **kwargs)
  7. self.__dict__ = self
  8. class TriggerNeuracle:
  9. def __init__(self, port, **kwargs):
  10. self.kwargs = kwargs
  11. # initiate triggerbox
  12. self.triggerbox = TriggerBox(port=port)
  13. def send_trigger(self, data):
  14. # directly using serial port
  15. self.triggerbox.OutputEventData(data)
  16. class TriggerBox(object):
  17. """docstring for TriggerBox"""
  18. functionIDSensorParaGet = 1
  19. functionIDSensorParaSet = 2
  20. functionIDDeviceInfoGet = 3
  21. functionIDDeviceNameGet = 4
  22. functionIDSensorSampleGet = 5
  23. functionIDSensorInfoGet = 6
  24. functionIDOutputEventData = 225
  25. functionIDError = 131
  26. sensorTypeDigitalIN = 1
  27. sensorTypeLight = 2
  28. sensorTypeLineIN = 3
  29. sensorTypeMic = 4
  30. sensorTypeKey = 5
  31. sensorTypeTemperature = 6
  32. sensorTypeHumidity = 7
  33. sensorTypeAmbientlight = 8
  34. sensorTypeDebug = 9
  35. sensorTypeAll = 255
  36. deviceID = 1
  37. # TODO: get device ID
  38. # properties
  39. comportHandle = None
  40. deviceName = None
  41. deviceInfo = None
  42. sensorInfo = None
  43. tcpOutput = None
  44. def __init__(self, port=None, tcpPort=None):
  45. if tcpPort is not None:
  46. self.tcpOutput = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  47. self.tcpOutput.connect(('localhost', tcpPort))
  48. self.comportHandle = serial.Serial(port, 115200, timeout=0.05)
  49. self.comportHandle.flush()
  50. self.GetDeviceName()
  51. self.GetDeviceInfo()
  52. self.GetSensorInfo()
  53. def OutputEventData(self, eventData):
  54. # directly mark trigger with serial
  55. # eventData is an unsigned short
  56. assert isinstance(eventData, int)
  57. msg = struct.pack('<H', eventData)
  58. self.SendCommand(self.functionIDOutputEventData, msg)
  59. resp = self.ReadResponse(self.functionIDOutputEventData)
  60. if self.tcpOutput is not None:
  61. self.tcpOutput.send(resp)
  62. def SetEventData(self, sensorID, eventData, triggerToBeOut=1):
  63. assert isinstance(eventData, int)
  64. sensorPara = self.GetSensorPara(sensorID)
  65. sensorPara.TriggerToBeOut = triggerToBeOut
  66. sensorPara.EventData = eventData
  67. self.SetSensorPara(sensorID, sensorPara)
  68. def GetDeviceName(self):
  69. self.SendCommand(self.functionIDDeviceNameGet, 1)
  70. name = self.ReadResponse(self.functionIDDeviceNameGet)
  71. name = name.decode()
  72. self.deviceName = name
  73. return name
  74. def GetDeviceInfo(self):
  75. self.SendCommand(self.functionIDDeviceInfoGet, 1)
  76. info = self.ReadResponse(self.functionIDDeviceInfoGet)
  77. deviceInfo = AttrDict({
  78. 'HardwareVersion': info[0],
  79. 'FirmwareVersion': info[1],
  80. 'SensorSum': info[2],
  81. 'ID': struct.unpack('<I', info[4:])
  82. })
  83. self.deviceInfo = deviceInfo
  84. return deviceInfo
  85. def GetSensorInfo(self):
  86. switch = {
  87. self.sensorTypeDigitalIN: 'DigitalIN',
  88. self.sensorTypeLight: 'Light',
  89. self.sensorTypeLineIN: 'LineIN',
  90. self.sensorTypeMic: 'Mic',
  91. self.sensorTypeKey: 'Key',
  92. self.sensorTypeTemperature: 'Temperature',
  93. self.sensorTypeHumidity: 'Humidity',
  94. self.sensorTypeAmbientlight: 'Ambientlight',
  95. self.sensorTypeDebug: 'Debug'
  96. }
  97. self.SendCommand(self.functionIDSensorInfoGet)
  98. info = self.ReadResponse(self.functionIDSensorInfoGet)
  99. sensorInfo = []
  100. for i in range(0, len(info), 2):
  101. # print(info[i], info[i+1])
  102. sensor_type = info[i]
  103. try:
  104. sensorType = switch[sensor_type]
  105. except KeyError:
  106. sensorType = 'Undefined'
  107. # print('Undefined sensor type')
  108. sensorNum = info[i + 1]
  109. sensorInfo.append(AttrDict(Type=sensorType, Number=sensorNum))
  110. self.sensorInfo = sensorInfo
  111. return sensorInfo
  112. def GetSensorPara(self, sensorID):
  113. sensor = self.sensorInfo[sensorID]
  114. cmd = [self.SensorType(sensor.Type), sensor.Number]
  115. cmd = struct.pack('<2B', *cmd)
  116. self.SendCommand(self.functionIDSensorParaGet, cmd)
  117. para = self.ReadResponse(self.functionIDSensorParaGet)
  118. para = struct.unpack('<2B3H', para)
  119. sensorPara = AttrDict({
  120. 'Edge': para[0],
  121. 'OutputChannel': para[1],
  122. 'TriggerToBeOut': para[2],
  123. 'Threshold': para[3],
  124. 'EventData': para[4]
  125. })
  126. return sensorPara
  127. def SetSensorPara(self, sensorID, sensorPara):
  128. sensor = self.sensorInfo[sensorID]
  129. cmd = [self.SensorType(sensor.Type), sensor.Number] + [sensorPara[key]
  130. for key in sensorPara.keys()]
  131. cmd = struct.pack('<4B3H', *cmd)
  132. self.SendCommand(self.functionIDSensorParaSet, cmd)
  133. resp = self.ReadResponse(self.functionIDSensorParaSet)
  134. isSucceed = (resp[0] == self.SensorType(sensor.Type)) and (resp[1] == sensor.Number)
  135. return isSucceed
  136. def GetSensorSample(self, sensorID):
  137. sensor = self.sensorInfo[sensorID]
  138. cmd = [self.SensorType(sensor.Type), sensor.Number]
  139. self.SendCommand(self.functionIDSensorSampleGet, struct.pack('<2B', *cmd))
  140. result = self.ReadResponse(self.functionIDSensorSampleGet)
  141. if result[0] != self.SensorType(sensor.Type) or result[1] != sensor.Number:
  142. raise Exception('Get sensor sample error')
  143. adcResult = struct.unpack('<H', result[2:])[0]
  144. return adcResult
  145. def SensorType(self, typeString):
  146. switch = {
  147. 'DigitalIN': self.sensorTypeDigitalIN,
  148. 'Light': self.sensorTypeLight,
  149. 'LineIN': self.sensorTypeLineIN,
  150. 'Mic': self.sensorTypeMic,
  151. 'Key': self.sensorTypeKey,
  152. 'Temperature': self.sensorTypeTemperature,
  153. 'Humidity': self.sensorTypeHumidity,
  154. 'Ambientlight': self.sensorTypeAmbientlight,
  155. 'Debug': self.sensorTypeDebug
  156. }
  157. try:
  158. typeNum = switch[typeString]
  159. except KeyError:
  160. raise Exception('Undefined sensor type')
  161. return typeNum
  162. def SendCommand(self, functionID, command=None):
  163. if command is not None:
  164. # process command data structure
  165. if isinstance(command, int):
  166. command = struct.pack('<B', command)
  167. # make sure command finally becomes 'bytes'
  168. assert isinstance(command, bytes)
  169. payload = len(command)
  170. else:
  171. payload = 0
  172. value = (self.deviceID, functionID, payload)
  173. message = struct.pack('<2BH', *value)
  174. if command is not None:
  175. message += command
  176. self.comportHandle.write(message)
  177. def ReadResponse(self, functionID):
  178. errorCases = {
  179. 0: 'None',
  180. 1: 'FrameHeader',
  181. 2: 'FramePayload',
  182. 3: 'ChannelNotExist',
  183. 4: 'DeviceID',
  184. 5: 'FunctionID',
  185. 6: 'SensorType'
  186. }
  187. message = self.comportHandle.read(4)
  188. message = struct.unpack('<2BH', message)
  189. if message[0] != self.deviceID:
  190. raise Exception('Response error: request deviceID %d, \
  191. return deviceID %d', self.deviceID, message[0])
  192. if message[1] != functionID:
  193. if message[1] == self.functionIDError:
  194. errorType = self.comportHandle.read(1)
  195. try:
  196. errorMessage = errorCases[errorType]
  197. except KeyError:
  198. raise Exception('Undefined error type')
  199. raise Exception('Response error: ', errorMessage)
  200. else:
  201. raise Exception('Response error: request functionID %d, \
  202. return functionID %d', functionID, message[1])
  203. payload = message[2]
  204. DataBuf = self.comportHandle.read(payload)
  205. return DataBuf