|
- '''
- @Author : gongchanghui
- @Ide : vscode
- @File : fubo_mechanical_finger.py
- @Time : 2023/08/29 16:49:11
- '''
- import logging
- import time
- import enum
- import threading
- import serial
- from serial.tools.list_ports import comports
- from core.peripheral.hand.base import PeripheralHandBase
- from settings.config import settings
- mechanical_finger_config = settings.config["hand_peripheral_parameter"]
- logger = logging.getLogger(__name__)
- def get_serial_ports():
- """获取可选端口"""
- ports = list(comports(include_links=False))
- available_ports_list = [port.device for port in ports]
- return available_ports_list
- class DeviceStatus(enum.Enum):
- NotOpend = 0
- Opened = 1
- DeviceRecieved = 2
- RMTC_GLOVE_Sended=3
- Get_Recieved=4
- Running=5
- Invalid=-1
- class RecievedPackageType(enum.Enum):
- Device_Package = 0
- Get_Package = 1
- Other_Package = 2
- No_Package = 3
- class FuboMechanicalFingerConnector:
- """富伯客户端
- 功能:连接;发送持续控制包维持链接;开启线程发送信号;接收信号等
- """
- def __init__(self, port) -> None:
- self.__port = port
- self.__heart_interval = 0.1
- self.__serial = None
- self.__baud_rate = 57600
- self.__data_bite = 8
- self.__timeout = 1
- self.__stop_bit = serial.STOPBITS_ONE # 停止位
- self.__parity_bit = serial.PARITY_NONE # 校验位
- self.__extend = False
- self.__extend_target_time = time.time()-1
- self.__thumb = 60
- self.__index_finger = 60
- self.__middle_finger = 60
- self.__ring_finger = 60
- self.__little_finger = 60
- self.__duration = 10
- self.is_connected = False
- self.stop_flag = False
- def connect(self):
- if self.__serial is not None:
- self.__serial.close()
- self.__serial = serial.Serial(port=self.__port,
- baudrate=self.__baud_rate,
- parity=self.__parity_bit,
- stopbits=self.__stop_bit,
- timeout=self.__timeout)
- if self.__serial.is_open:
- logger.info("Open Fubo Mechanical Finger Device Failed...")
- else:
- logger.warning("Open Fubo Mechanical Finger Device Failed...")
- return self.__serial.is_open
- def start_client(self):
- """启动客户端"""
- try:
- self.is_connected = self.connect()
- except Exception:
- return False
- self.stop_flag = False
- if not self.is_connected:
- return False
- self.__extend_target_time = time.time()+2
- # 向服务端发送心跳包
- working_thread = threading.Thread(target=self.__working_thrend_func,
- args=())
- working_thread.start()
- return True
- def create_send_t(self, send_data):
- """外部程序调用"""
- send_t = threading.Thread(target=self.send_data, args=(send_data,))
- send_t.start()
- send_t.join()
- def send_data(self, cmd, update=None):
- if update is None:
- update = dict()
- send_data = self.protocol.get_pck(cmd, update)
- self.sock.sendall(send_data)
- def sync_send_data(self, cmd, update=None):
- """同步接口"""
- if update is None:
- update = dict()
- send_data = self.protocol.get_pck(cmd, update)
- if send_data:
- self.sock.send(send_data)
- res = self.filter_recv_msg()
- return res
- else:
- return None
- def extend(self, thumb, index_finger, middle_finger, ring_finger, little_finger, duration):
- self.__extend_target_time = time.time()+int(duration)
- self.__thumb = int(thumb)
- self.__index_finger = index_finger
- self.__middle_finger = middle_finger
- self.__ring_finger = ring_finger
- self.__little_finger = little_finger
- self.__duration = duration
- def flex(self):
- self.__extend_target_time = time.time()-1
- def __working_thrend_func(self):
- device_status = DeviceStatus.Opened
- last_device_status = device_status
- command_count = 0
- time_last_receive_package = time.time()
- while self.stop_flag is not True:
- try:
- if not self.__serial.is_open:
- self.is_connected = self.connect()
- data_count_in_buffer = self.__serial.in_waiting
- in_data = None
- in_data_str = None
- package_type = RecievedPackageType.No_Package
- if data_count_in_buffer > 0:
- now = time.time()
- if now - time_last_receive_package > 3:
- device_status = DeviceStatus.Opened
- in_data = self.__serial.read(data_count_in_buffer)
- in_data_str = str(in_data, encoding="utf-8")
- if data_count_in_buffer >= 3 and "get" in in_data_str:
- package_type = RecievedPackageType.Get_Package
- time_last_receive_package = now
- elif data_count_in_buffer >= 5 and "device" in in_data_str:
- package_type = RecievedPackageType.Device_Package
- time_last_receive_package = now
- self.__extend_target_time = time.time()+2
- else:
- package_type = RecievedPackageType.Other_Package
- if last_device_status != device_status:
- print(device_status)
- last_device_status = device_status
- #if data_count_in_buffer > 0:
- #print(device_status)
- #print_hex("recv data:", in_data)
- if device_status is DeviceStatus.NotOpend:
- pass
- elif device_status is DeviceStatus.Opened:
- if data_count_in_buffer <= 0:
- continue
- if package_type is RecievedPackageType.Device_Package:
- print("**** device command recieved***")
- device_status = DeviceStatus.DeviceRecieved
- elif device_status is DeviceStatus.DeviceRecieved:
- self.__serial.write(b"RMTC_GLOVE\r")
- device_status = DeviceStatus.RMTC_GLOVE_Sended
- pass
- elif device_status is DeviceStatus.RMTC_GLOVE_Sended:
- if data_count_in_buffer <= 0:
- continue
- if package_type is RecievedPackageType.Get_Package:
- print("**** get recieved***")
- device_status = DeviceStatus.Get_Recieved
- elif device_status is DeviceStatus.Get_Recieved:
- self.__serial.write(b"DATA:0:40:0:40:0:40:0:40:0:40\r")
- device_status = DeviceStatus.Running
- elif device_status is DeviceStatus.Running:
- if now > self.__extend_target_time:
- self.__serial.write(b"DATA:0:00:0:00:0:00:0:00:0:00\r")
- else:
- cmd_str = "DATA:0:{0}:0:{1}:0:{2}:0:{3}:0:{4}\r".format(self.__thumb,
- self.__index_finger,
- self.__middle_finger,
- self.__ring_finger,
- self.__little_finger)
- self.__serial.write(cmd_str.encode('UTF-8'))
- elif device_status is DeviceStatus.Invalid:
- pass
- if package_type is RecievedPackageType.Device_Package and \
- device_status is not DeviceStatus.Opened and \
- device_status is not DeviceStatus.DeviceRecieved and \
- device_status is not DeviceStatus.RMTC_GLOVE_Sended:
- print("**** device command recieved***")
- device_status = DeviceStatus.DeviceRecieved
- except Exception as e:
- logger.info(
- "Send beat data failed, Hand Peripheral may be disconnected.")
- self.is_connected = False
- time.sleep(self.__heart_interval)
- if self.__serial is not None and self.__serial.is_open:
- self.__serial.close()
- self.__serial = None
- def close_client(self):
- self.stop_flag = True
- class FuboMechanicalFingerClient(PeripheralHandBase):
- """富伯机械手客户端"""
- def __init__(self, init_params=None):
- if init_params:
- self.port = init_params["port"]
- self.__thumb = int(init_params["thumb"])
- self.__index_finger = int(init_params["index_finger"])
- self.__middle_finger = int(init_params["middle_finger"])
- self.__ring_finger = int(init_params["ring_finger"])
- self.__little_finger = int(init_params["little_finger"])
- self.__duration = int(init_params["duration"])
- else:
- self.port = "COM12"
- self.__thumb = 60
- self.__index_finger = 60
- self.__middle_finger = 60
- self.__ring_finger = 60
- self.__little_finger = 60
- self.__duration = 10
- self.connector = FuboMechanicalFingerConnector(self.port)
- def __del__(self):
- if self.connector is not None:
- self.connector.close_client()
- self.connector = None
- def init(self):
- ret = self.connector.start_client()
- return {"is_connected": self.connector.is_connected, "msg": ret}
- def start(self, train=None):
- self.connector.extend(self.__thumb,
- self.__index_finger,
- self.__middle_finger,
- self.__ring_finger,
- self.__little_finger,
- self.__duration)
- return 1
- def stop(self):
- self.connector.flex()
- return 1
- def status(self):
- status = {"is_connected": self.connector.is_connected}
- return status
- def close(self):
- if self.connector:
- self.connector.close_client()
- self.connector = None
- return {"is_connected": False}
|