fubo_pneumatic_finger.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import logging
  2. import serial
  3. from serial.tools.list_ports import comports
  4. logger = logging.getLogger(__name__)
  5. def get_serial_ports():
  6. """获取可选端口"""
  7. ports = list(comports(include_links=False))
  8. available_ports_list = [port.device for port in ports]
  9. return available_ports_list
  10. class FingerController:
  11. def __init__(self, mode='step', init_params=dict()):
  12. self.hand = FuboPneumaticFingerClient(init_params)
  13. self.current_state = 'rest'
  14. if mode not in ['edge', 'step']:
  15. raise ValueError('mode must be in \{"edge", "step"\}')
  16. self.mode = mode
  17. def move(self, action='flex'):
  18. if self.mode == 'edge':
  19. self._edge(action)
  20. elif self.mode == 'step':
  21. self._step(action)
  22. def _step(self, action='flex'):
  23. if action == 'rest':
  24. self.hand.start('extend')
  25. else:
  26. self.hand.start(action)
  27. self.current_state = action
  28. def _edge(self, action='flex'):
  29. if action == 'rest':
  30. return
  31. if self.current_state == 'extend':
  32. self.hand.start(action)
  33. self.current_state = action
  34. elif self.current_state == action:
  35. self.hand.start('extend')
  36. self.current_state = 'extend'
  37. else: # rest or any other
  38. self.hand.start(action)
  39. self.current_state = action
  40. class FuboPneumaticFingerClient:
  41. """富伯客户端"""
  42. COMMAND_TABLE = {
  43. 'rest': b"R",
  44. 'cylinder': b"C",
  45. 'ball': b"B",
  46. 'flex': b"F",
  47. 'double': b"D",
  48. 'treble': b"T",
  49. 'extend': b"E",
  50. }
  51. def __init__(self, init_params=None):
  52. self.baud_rate = 9600
  53. self.data_bite = 8
  54. self.timeout = 1
  55. self.stop_bit = serial.STOPBITS_ONE # 停止位
  56. self.parity_bit = serial.PARITY_NONE # 校验位
  57. self.is_connected = False
  58. self.ser = None # 连接对象
  59. self.port = "COM 4"
  60. if init_params:
  61. self.port = init_params["port"]
  62. self.connect()
  63. def __del__(self):
  64. self.ser.close()
  65. def connect(self):
  66. try:
  67. self.ser = serial.Serial(port=self.port,
  68. baudrate=self.baud_rate,
  69. parity=self.parity_bit,
  70. stopbits=self.stop_bit,
  71. timeout=self.timeout)
  72. if self.ser.is_open:
  73. self.is_connected = True
  74. logger.info("connect")
  75. return 1
  76. else:
  77. self.is_connected = False
  78. logger.warning("open failed")
  79. return 0
  80. except OSError as e:
  81. warning_info = f"pneumatic finger connect failed: {e}"
  82. logger.warning(warning_info)
  83. return 0
  84. def start(self, command):
  85. self.ser.write(self.COMMAND_TABLE[command])
  86. def status(self):
  87. status = {"is_connected": self.is_connected}
  88. return status
  89. def close(self):
  90. if self.ser:
  91. self.ser.close()
  92. self.is_connected = False
  93. return {"is_connected": self.is_connected}