1
0

fubo_pneumatic_finger.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. '''
  2. @Author : liujunshen
  3. @Ide : vscode
  4. @File : fubo_pneumatic_finger.py
  5. @Time : 2023/04/03 16:49:11
  6. '''
  7. import logging
  8. import time
  9. import serial
  10. from serial.tools.list_ports import comports
  11. logger = logging.getLogger(__name__)
  12. def get_serial_ports():
  13. """获取可选端口"""
  14. ports = list(comports(include_links=False))
  15. available_ports_list = [port.device for port in ports]
  16. return available_ports_list
  17. class FuboPneumaticFingerClient:
  18. """富伯客户端"""
  19. FLEX_CMD = b"F"
  20. EXTEND_CMD = b"E"
  21. BALL_CMD = b"B"
  22. CYLINDER_CMD = b"C"
  23. DOUBLE_CMD = b"D"
  24. TREBLE_CMD = b"T"
  25. RELEASE_CMD = b"R"
  26. def __init__(self, init_params=None):
  27. self.baud_rate = 9600
  28. self.data_bite = 8
  29. self.timeout = 1
  30. self.stop_bit = serial.STOPBITS_ONE # 停止位
  31. self.parity_bit = serial.PARITY_NONE # 校验位
  32. self.is_connected = False
  33. self.ser = None # 连接对象
  34. self.port = "COM 4"
  35. if init_params:
  36. self.port = init_params["port"]
  37. self.init()
  38. def __del__(self):
  39. self.ser.close()
  40. def connect(self):
  41. try:
  42. self.ser = serial.Serial(port=self.port,
  43. baudrate=self.baud_rate,
  44. parity=self.parity_bit,
  45. stopbits=self.stop_bit,
  46. timeout=self.timeout)
  47. if self.ser.is_open:
  48. self.is_connected = True
  49. logger.info("connect")
  50. return 1
  51. else:
  52. self.is_connected = False
  53. logger.warning("open failed")
  54. return 0
  55. except OSError as e:
  56. warning_info = f"pneumatic finger connect failed: {e}"
  57. logger.warning(warning_info)
  58. return 0
  59. def release(self):
  60. self.ser.write(self.RELEASE_CMD)
  61. return self.ser.read()
  62. def extend(self):
  63. self.ser.write(self.EXTEND_CMD)
  64. return self.ser.read()
  65. def reconnect(self):
  66. self.close()
  67. return self.connect()
  68. def start(self, model=None):
  69. if (model == "flex") or (model is None):
  70. self.ser.write(self.FLEX_CMD)
  71. elif model == "ball":
  72. self.ser.write(self.BALL_CMD)
  73. elif model == "cylinder":
  74. self.ser.write(self.CYLINDER_CMD)
  75. elif model == "double":
  76. self.ser.write(self.DOUBLE_CMD)
  77. elif model == "treble":
  78. self.ser.write(self.TREBLE_CMD)
  79. return self.ser.read()
  80. def start_round(self, model=None, time_interval=5):
  81. self.start(model)
  82. time.sleep(time_interval)
  83. self.extend()
  84. return 1
  85. def stop(self):
  86. return 1
  87. def status(self):
  88. status = {"is_connected": self.is_connected}
  89. return status
  90. def close(self):
  91. if self.ser:
  92. self.ser.close()
  93. self.is_connected = False
  94. return {"is_connected": self.is_connected}