fubo_pneumatic_finger.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. '''
  2. @Author : liujunshen
  3. @Ide : vscode
  4. @File : fubo_pneumatic_finger.py
  5. @Time : 2023/04/03 16:49:11
  6. '''
  7. import logging
  8. import time
  9. import serial
  10. from serial.tools.list_ports import comports
  11. from core.peripheral.hand.base import PeripheralHandBase
  12. logger = logging.getLogger(__name__)
  13. def get_serial_ports():
  14. """获取可选端口"""
  15. ports = list(comports(include_links=False))
  16. available_ports_list = [port.device for port in ports]
  17. return available_ports_list
  18. class FuboPneumaticFingerClient(PeripheralHandBase):
  19. """富伯客户端"""
  20. FLEX_CMD = b"F"
  21. EXTEND_CMD = b"E"
  22. BALL_CMD = b"B"
  23. CYLINDER_CMD = b"C"
  24. DOUBLE_CMD = b"D"
  25. TREBLE_CMD = b"T"
  26. def __init__(self, init_params=None):
  27. self.baud_rate = 9600
  28. self.data_bite = 8
  29. self.timeout = 1
  30. self.stop_bit = serial.STOPBITS_ONE # 停止位
  31. self.parity_bit = serial.PARITY_NONE # 校验位
  32. self.is_connected = False
  33. self.ser = None # 连接对象
  34. self.port = "COM 4"
  35. if init_params:
  36. self.port = init_params["port"]
  37. def __del__(self):
  38. self.ser.close()
  39. def connect(self):
  40. try:
  41. self.ser = serial.Serial(port=self.port,
  42. baudrate=self.baud_rate,
  43. parity=self.parity_bit,
  44. stopbits=self.stop_bit,
  45. timeout=self.timeout)
  46. if self.ser.is_open:
  47. self.is_connected = True
  48. logger.info("connect")
  49. return 1
  50. else:
  51. self.is_connected = False
  52. logger.warning("open failed")
  53. return 0
  54. except OSError as e:
  55. warning_info = f"pneumatic finger connect failed: {e}"
  56. logger.warning(warning_info)
  57. return 0
  58. def flex(self):
  59. self.ser.write(self.FLEX_CMD)
  60. return self.ser.read()
  61. def extend(self):
  62. self.ser.write(self.EXTEND_CMD)
  63. return self.ser.read()
  64. def reconnect(self):
  65. self.close()
  66. return self.connect()
  67. def init(self):
  68. ret = self.connect()
  69. return {"is_connected": self.is_connected, "msg": ret}
  70. def start(self, train=None):
  71. model = train.fubo_pneumatic_finger.pneumatic_finger_model
  72. if model == "flex":
  73. self.flex()
  74. elif model == "ball":
  75. self.ser.write(self.BALL_CMD)
  76. elif model == "cylinder":
  77. self.ser.write(self.CYLINDER_CMD)
  78. elif model == "double":
  79. self.ser.write(self.DOUBLE_CMD)
  80. elif model == "treble":
  81. self.ser.write(self.TREBLE_CMD)
  82. time.sleep(7)
  83. self.extend()
  84. return 1
  85. def stop(self):
  86. return 1
  87. def status(self):
  88. status = {"is_connected": self.is_connected}
  89. return status
  90. def close(self):
  91. if self.ser:
  92. self.ser.close()
  93. self.is_connected = False
  94. return {"is_connected": self.is_connected}