fubo_mechanical_finger.py 11 KB


  1. '''
  2. @Author : gongchanghui
  3. @Ide : vscode
  4. @File : fubo_mechanical_finger.py
  5. @Time : 2023/08/29 16:49:11
  6. '''
  7. import logging
  8. import time
  9. import enum
  10. import threading
  11. import serial
  12. from serial.tools.list_ports import comports
  13. from core.peripheral.hand.base import PeripheralHandBase
  14. from settings.config import settings
  15. mechanical_finger_config = settings.config["hand_peripheral_parameter"]
  16. logger = logging.getLogger(__name__)
  17. def get_serial_ports():
  18. """获取可选端口"""
  19. ports = list(comports(include_links=False))
  20. available_ports_list = [port.device for port in ports]
  21. return available_ports_list
  22. class DeviceStatus(enum.Enum):
  23. NotOpend = 0
  24. Opened = 1
  25. DeviceRecieved = 2
  26. RMTC_GLOVE_Sended=3
  27. Get_Recieved=4
  28. Running=5
  29. Invalid=-1
  30. class RecievedPackageType(enum.Enum):
  31. Device_Package = 0
  32. Get_Package = 1
  33. Other_Package = 2
  34. No_Package = 3
  35. class FuboMechanicalFingerConnector:
  36. """富伯客户端
  37. 功能:连接;发送持续控制包维持链接;开启线程发送信号;接收信号等
  38. """
  39. def __init__(self, port) -> None:
  40. self.__port = port
  41. self.__heart_interval = 0.1
  42. self.__serial = None
  43. self.__baud_rate = 57600
  44. self.__data_bite = 8
  45. self.__timeout = 1
  46. self.__stop_bit = serial.STOPBITS_ONE # 停止位
  47. self.__parity_bit = serial.PARITY_NONE # 校验位
  48. self.__extend = False
  49. self.__extend_target_time = time.time()-1
  50. self.__thumb = 60
  51. self.__index_finger = 60
  52. self.__middle_finger = 60
  53. self.__ring_finger = 60
  54. self.__little_finger = 60
  55. self.__duration = 10
  56. self.is_connected = False
  57. self.stop_flag = False
  58. def connect(self):
  59. if self.__serial is not None:
  60. self.__serial.close()
  61. self.__serial = serial.Serial(port=self.__port,
  62. baudrate=self.__baud_rate,
  63. parity=self.__parity_bit,
  64. stopbits=self.__stop_bit,
  65. timeout=self.__timeout)
  66. if self.__serial.is_open:
  67. logger.info("Open Fubo Mechanical Finger Device Failed...")
  68. else:
  69. logger.warning("Open Fubo Mechanical Finger Device Failed...")
  70. return self.__serial.is_open
  71. def start_client(self):
  72. """启动客户端"""
  73. try:
  74. self.is_connected = self.connect()
  75. except Exception:
  76. return False
  77. self.stop_flag = False
  78. if not self.is_connected:
  79. return False
  80. self.__extend_target_time = time.time()+2
  81. # 向服务端发送心跳包
  82. working_thread = threading.Thread(target=self.__working_thrend_func,
  83. args=())
  84. working_thread.start()
  85. return True
  86. def create_send_t(self, send_data):
  87. """外部程序调用"""
  88. send_t = threading.Thread(target=self.send_data, args=(send_data,))
  89. send_t.start()
  90. send_t.join()
  91. def send_data(self, cmd, update=None):
  92. if update is None:
  93. update = dict()
  94. send_data = self.protocol.get_pck(cmd, update)
  95. self.sock.sendall(send_data)
  96. def sync_send_data(self, cmd, update=None):
  97. """同步接口"""
  98. if update is None:
  99. update = dict()
  100. send_data = self.protocol.get_pck(cmd, update)
  101. if send_data:
  102. self.sock.send(send_data)
  103. res = self.filter_recv_msg()
  104. return res
  105. else:
  106. return None
  107. def extend(self, thumb, index_finger, middle_finger, ring_finger, little_finger, duration):
  108. self.__extend_target_time = time.time()+int(duration)
  109. self.__thumb = int(thumb)
  110. self.__index_finger = index_finger
  111. self.__middle_finger = middle_finger
  112. self.__ring_finger = ring_finger
  113. self.__little_finger = little_finger
  114. self.__duration = duration
  115. def flex(self):
  116. self.__extend_target_time = time.time()-1
  117. def __working_thrend_func(self):
  118. device_status = DeviceStatus.Opened
  119. last_device_status = device_status
  120. command_count = 0
  121. time_last_receive_package = time.time()
  122. while self.stop_flag is not True:
  123. try:
  124. if not self.__serial.is_open:
  125. self.is_connected = self.connect()
  126. data_count_in_buffer = self.__serial.in_waiting
  127. in_data = None
  128. in_data_str = None
  129. package_type = RecievedPackageType.No_Package
  130. if data_count_in_buffer > 0:
  131. now = time.time()
  132. if now - time_last_receive_package > 3:
  133. device_status = DeviceStatus.Opened
  134. in_data = self.__serial.read(data_count_in_buffer)
  135. in_data_str = str(in_data, encoding="utf-8")
  136. if data_count_in_buffer >= 3 and "get" in in_data_str:
  137. package_type = RecievedPackageType.Get_Package
  138. time_last_receive_package = now
  139. elif data_count_in_buffer >= 5 and "device" in in_data_str:
  140. package_type = RecievedPackageType.Device_Package
  141. time_last_receive_package = now
  142. self.__extend_target_time = time.time()+2
  143. else:
  144. package_type = RecievedPackageType.Other_Package
  145. if last_device_status != device_status:
  146. print(device_status)
  147. last_device_status = device_status
  148. #if data_count_in_buffer > 0:
  149. #print(device_status)
  150. #print_hex("recv data:", in_data)
  151. if device_status is DeviceStatus.NotOpend:
  152. pass
  153. elif device_status is DeviceStatus.Opened:
  154. if data_count_in_buffer <= 0:
  155. continue
  156. if package_type is RecievedPackageType.Device_Package:
  157. print("**** device command recieved***")
  158. device_status = DeviceStatus.DeviceRecieved
  159. elif device_status is DeviceStatus.DeviceRecieved:
  160. self.__serial.write(b"RMTC_GLOVE\r")
  161. device_status = DeviceStatus.RMTC_GLOVE_Sended
  162. pass
  163. elif device_status is DeviceStatus.RMTC_GLOVE_Sended:
  164. if data_count_in_buffer <= 0:
  165. continue
  166. if package_type is RecievedPackageType.Get_Package:
  167. print("**** get recieved***")
  168. device_status = DeviceStatus.Get_Recieved
  169. elif device_status is DeviceStatus.Get_Recieved:
  170. self.__serial.write(b"DATA:0:40:0:40:0:40:0:40:0:40\r")
  171. device_status = DeviceStatus.Running
  172. elif device_status is DeviceStatus.Running:
  173. if now > self.__extend_target_time:
  174. self.__serial.write(b"DATA:0:00:0:00:0:00:0:00:0:00\r")
  175. else:
  176. cmd_str = "DATA:0:{0}:0:{1}:0:{2}:0:{3}:0:{4}\r".format(self.__thumb,
  177. self.__index_finger,
  178. self.__middle_finger,
  179. self.__ring_finger,
  180. self.__little_finger)
  181. self.__serial.write(cmd_str.encode('UTF-8'))
  182. elif device_status is DeviceStatus.Invalid:
  183. pass
  184. if package_type is RecievedPackageType.Device_Package and \
  185. device_status is not DeviceStatus.Opened and \
  186. device_status is not DeviceStatus.DeviceRecieved and \
  187. device_status is not DeviceStatus.RMTC_GLOVE_Sended:
  188. print("**** device command recieved***")
  189. device_status = DeviceStatus.DeviceRecieved
  190. except Exception as e:
  191. logger.info(
  192. "Send beat data failed, Hand Peripheral may be disconnected.")
  193. self.is_connected = False
  194. time.sleep(self.__heart_interval)
  195. if self.__serial is not None and self.__serial.is_open:
  196. self.__serial.close()
  197. self.__serial = None
  198. def close_client(self):
  199. self.stop_flag = True
  200. class FuboMechanicalFingerClient(PeripheralHandBase):
  201. """富伯机械手客户端"""
  202. def __init__(self, init_params=None):
  203. if init_params:
  204. self.port = init_params["port"]
  205. self.__thumb = int(init_params["thumb"])
  206. self.__index_finger = int(init_params["index_finger"])
  207. self.__middle_finger = int(init_params["middle_finger"])
  208. self.__ring_finger = int(init_params["ring_finger"])
  209. self.__little_finger = int(init_params["little_finger"])
  210. self.__duration = int(init_params["duration"])
  211. else:
  212. self.port = "COM12"
  213. self.__thumb = 60
  214. self.__index_finger = 60
  215. self.__middle_finger = 60
  216. self.__ring_finger = 60
  217. self.__little_finger = 60
  218. self.__duration = 10
  219. self.connector = FuboMechanicalFingerConnector(self.port)
  220. def __del__(self):
  221. if self.connector is not None:
  222. self.connector.close_client()
  223. self.connector = None
  224. def init(self):
  225. ret = self.connector.start_client()
  226. return {"is_connected": self.connector.is_connected, "msg": ret}
  227. def start(self, train=None):
  228. self.connector.extend(self.__thumb,
  229. self.__index_finger,
  230. self.__middle_finger,
  231. self.__ring_finger,
  232. self.__little_finger,
  233. self.__duration)
  234. return 1
  235. def stop(self):
  236. self.connector.flex()
  237. return 1
  238. def status(self):
  239. status = {"is_connected": self.connector.is_connected}
  240. return status
  241. def close(self):
  242. if self.connector:
  243. self.connector.close_client()
  244. self.connector = None
  245. return {"is_connected": False}