123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import logging
- import serial
- from serial.tools.list_ports import comports
- logger = logging.getLogger(__name__)
- def get_serial_ports():
- """获取可选端口"""
- ports = list(comports(include_links=False))
- available_ports_list = [port.device for port in ports]
- return available_ports_list
- class FingerController:
- def __init__(self, mode='step', init_params=dict()):
- self.hand = FuboPneumaticFingerClient(init_params)
- self.current_state = 'rest'
- if mode not in ['edge', 'step']:
- raise ValueError('mode must be in \{"edge", "step"\}')
- self.mode = mode
- def move(self, action='flex'):
- if self.mode == 'edge':
- self._edge(action)
- elif self.mode == 'step':
- self._step(action)
-
- def _step(self, action='flex'):
- if action == 'rest':
- self.hand.start('extend')
- else:
- self.hand.start(action)
- self.current_state = action
- def _edge(self, action='flex'):
- if action == 'rest':
- return
- if self.current_state == 'extend':
- self.hand.start(action)
- self.current_state = action
- elif self.current_state == action:
- self.hand.start('extend')
- self.current_state = 'extend'
- else: # rest or any other
- self.hand.start(action)
- self.current_state = action
- class FuboPneumaticFingerClient:
- """富伯客户端"""
-
- COMMAND_TABLE = {
- 'rest': b"R",
- 'cylinder': b"C",
- 'ball': b"B",
- 'flex': b"F",
- 'double': b"D",
- 'treble': b"T",
- 'extend': b"E",
- }
- def __init__(self, init_params=None):
- self.baud_rate = 9600
- self.data_bite = 8
- self.timeout = 1
- self.stop_bit = serial.STOPBITS_ONE # 停止位
- self.parity_bit = serial.PARITY_NONE # 校验位
- self.is_connected = False
- self.ser = None # 连接对象
- self.port = "COM 4"
- if init_params:
- self.port = init_params["port"]
-
- self.connect()
- def __del__(self):
- self.ser.close()
- def connect(self):
- try:
- self.ser = serial.Serial(port=self.port,
- baudrate=self.baud_rate,
- parity=self.parity_bit,
- stopbits=self.stop_bit,
- timeout=self.timeout)
- if self.ser.is_open:
- self.is_connected = True
- logger.info("connect")
- return 1
- else:
- self.is_connected = False
- logger.warning("open failed")
- return 0
- except OSError as e:
- warning_info = f"pneumatic finger connect failed: {e}"
- logger.warning(warning_info)
- return 0
-
- def start(self, command):
- self.ser.write(self.COMMAND_TABLE[command])
- def status(self):
- status = {"is_connected": self.is_connected}
- return status
- def close(self):
- if self.ser:
- self.ser.close()
- self.is_connected = False
- return {"is_connected": self.is_connected}
|