''' @Author : liujunshen @Ide : vscode @File : fubo_pneumatic_finger.py @Time : 2023/04/03 16:49:11 ''' import logging import time import serial from serial.tools.list_ports import comports from core.peripheral.hand.base import PeripheralHandBase logger = logging.getLogger(__name__) def get_serial_ports(): """获取可选端口""" ports = list(comports(include_links=False)) available_ports_list = [port.device for port in ports] return available_ports_list class FuboPneumaticFingerClient(PeripheralHandBase): """富伯客户端""" FLEX_CMD = b"F" EXTEND_CMD = b"E" BALL_CMD = b"B" CYLINDER_CMD = b"C" DOUBLE_CMD = b"D" TREBLE_CMD = b"T" def __init__(self, init_params=None): self.baud_rate = 9600 self.data_bite = 8 self.timeout = 1 self.stop_bit = serial.STOPBITS_ONE # 停止位 self.parity_bit = serial.PARITY_NONE # 校验位 self.is_connected = False self.ser = None # 连接对象 self.port = "COM 4" if init_params: self.port = init_params["port"] def __del__(self): self.ser.close() def connect(self): try: self.ser = serial.Serial(port=self.port, baudrate=self.baud_rate, parity=self.parity_bit, stopbits=self.stop_bit, timeout=self.timeout) if self.ser.is_open: self.is_connected = True logger.info("connect") return 1 else: self.is_connected = False logger.warning("open failed") return 0 except OSError as e: warning_info = f"pneumatic finger connect failed: {e}" logger.warning(warning_info) return 0 def flex(self): self.ser.write(self.FLEX_CMD) return self.ser.read() def extend(self): self.ser.write(self.EXTEND_CMD) return self.ser.read() def reconnect(self): self.close() return self.connect() def init(self): ret = self.connect() return {"is_connected": self.is_connected, "msg": ret} def start(self, train=None): model = train.fubo_pneumatic_finger.pneumatic_finger_model if model == "flex": self.flex() elif model == "ball": self.ser.write(self.BALL_CMD) elif model == "cylinder": self.ser.write(self.CYLINDER_CMD) elif model == "double": self.ser.write(self.DOUBLE_CMD) elif model == "treble": self.ser.write(self.TREBLE_CMD) time.sleep(7) self.extend() return 1 def stop(self): return 1 def status(self): status = {"is_connected": self.is_connected} return status def close(self): if self.ser: self.ser.close() self.is_connected = False return {"is_connected": self.is_connected}