''' @Author : gongchanghui @Ide : vscode @File : fubo_mechanical_finger.py @Time : 2023/08/29 16:49:11 ''' import logging import time import enum import threading import serial from serial.tools.list_ports import comports from core.peripheral.hand.base import PeripheralHandBase from settings.config import settings mechanical_finger_config = settings.config["hand_peripheral_parameter"] logger = logging.getLogger(__name__) def get_serial_ports(): """获取可选端口""" ports = list(comports(include_links=False)) available_ports_list = [port.device for port in ports] return available_ports_list class DeviceStatus(enum.Enum): NotOpend = 0 Opened = 1 DeviceRecieved = 2 RMTC_GLOVE_Sended=3 Get_Recieved=4 Running=5 Invalid=-1 class RecievedPackageType(enum.Enum): Device_Package = 0 Get_Package = 1 Other_Package = 2 No_Package = 3 class FuboMechanicalFingerConnector: """富伯客户端 功能:连接;发送持续控制包维持链接;开启线程发送信号;接收信号等 """ def __init__(self, port) -> None: self.__port = port self.__heart_interval = 0.1 self.__serial = None self.__baud_rate = 57600 self.__data_bite = 8 self.__timeout = 1 self.__stop_bit = serial.STOPBITS_ONE # 停止位 self.__parity_bit = serial.PARITY_NONE # 校验位 self.__extend = False self.__extend_target_time = time.time()-1 self.__thumb = 60 self.__index_finger = 60 self.__middle_finger = 60 self.__ring_finger = 60 self.__little_finger = 60 self.__duration = 10 self.is_connected = False self.stop_flag = False def connect(self): if self.__serial is not None: self.__serial.close() self.__serial = serial.Serial(port=self.__port, baudrate=self.__baud_rate, parity=self.__parity_bit, stopbits=self.__stop_bit, timeout=self.__timeout) if self.__serial.is_open: logger.info("Open Fubo Mechanical Finger Device Failed...") else: logger.warning("Open Fubo Mechanical Finger Device Failed...") return self.__serial.is_open def start_client(self): """启动客户端""" try: self.is_connected = self.connect() except Exception: return False self.stop_flag = False if not self.is_connected: return False self.__extend_target_time = time.time()+2 # 向服务端发送心跳包 working_thread = threading.Thread(target=self.__working_thrend_func, args=()) working_thread.start() return True def create_send_t(self, send_data): """外部程序调用""" send_t = threading.Thread(target=self.send_data, args=(send_data,)) send_t.start() send_t.join() def send_data(self, cmd, update=None): if update is None: update = dict() send_data = self.protocol.get_pck(cmd, update) self.sock.sendall(send_data) def sync_send_data(self, cmd, update=None): """同步接口""" if update is None: update = dict() send_data = self.protocol.get_pck(cmd, update) if send_data: self.sock.send(send_data) res = self.filter_recv_msg() return res else: return None def extend(self, thumb, index_finger, middle_finger, ring_finger, little_finger, duration): self.__extend_target_time = time.time()+int(duration) self.__thumb = int(thumb) self.__index_finger = index_finger self.__middle_finger = middle_finger self.__ring_finger = ring_finger self.__little_finger = little_finger self.__duration = duration def flex(self): self.__extend_target_time = time.time()-1 def __working_thrend_func(self): device_status = DeviceStatus.Opened last_device_status = device_status command_count = 0 time_last_receive_package = time.time() while self.stop_flag is not True: try: if not self.__serial.is_open: self.is_connected = self.connect() data_count_in_buffer = self.__serial.in_waiting in_data = None in_data_str = None package_type = RecievedPackageType.No_Package if data_count_in_buffer > 0: now = time.time() if now - time_last_receive_package > 3: device_status = DeviceStatus.Opened in_data = self.__serial.read(data_count_in_buffer) in_data_str = str(in_data, encoding="utf-8") if data_count_in_buffer >= 3 and "get" in in_data_str: package_type = RecievedPackageType.Get_Package time_last_receive_package = now elif data_count_in_buffer >= 5 and "device" in in_data_str: package_type = RecievedPackageType.Device_Package time_last_receive_package = now self.__extend_target_time = time.time()+2 else: package_type = RecievedPackageType.Other_Package if last_device_status != device_status: print(device_status) last_device_status = device_status #if data_count_in_buffer > 0: #print(device_status) #print_hex("recv data:", in_data) if device_status is DeviceStatus.NotOpend: pass elif device_status is DeviceStatus.Opened: if data_count_in_buffer <= 0: continue if package_type is RecievedPackageType.Device_Package: print("**** device command recieved***") device_status = DeviceStatus.DeviceRecieved elif device_status is DeviceStatus.DeviceRecieved: self.__serial.write(b"RMTC_GLOVE\r") device_status = DeviceStatus.RMTC_GLOVE_Sended pass elif device_status is DeviceStatus.RMTC_GLOVE_Sended: if data_count_in_buffer <= 0: continue if package_type is RecievedPackageType.Get_Package: print("**** get recieved***") device_status = DeviceStatus.Get_Recieved elif device_status is DeviceStatus.Get_Recieved: self.__serial.write(b"DATA:0:40:0:40:0:40:0:40:0:40\r") device_status = DeviceStatus.Running elif device_status is DeviceStatus.Running: if now > self.__extend_target_time: self.__serial.write(b"DATA:0:00:0:00:0:00:0:00:0:00\r") else: cmd_str = "DATA:0:{0}:0:{1}:0:{2}:0:{3}:0:{4}\r".format(self.__thumb, self.__index_finger, self.__middle_finger, self.__ring_finger, self.__little_finger) self.__serial.write(cmd_str.encode('UTF-8')) elif device_status is DeviceStatus.Invalid: pass if package_type is RecievedPackageType.Device_Package and \ device_status is not DeviceStatus.Opened and \ device_status is not DeviceStatus.DeviceRecieved and \ device_status is not DeviceStatus.RMTC_GLOVE_Sended: print("**** device command recieved***") device_status = DeviceStatus.DeviceRecieved except Exception as e: logger.info( "Send beat data failed, Hand Peripheral may be disconnected.") self.is_connected = False time.sleep(self.__heart_interval) if self.__serial is not None and self.__serial.is_open: self.__serial.close() self.__serial = None def close_client(self): self.stop_flag = True class FuboMechanicalFingerClient(PeripheralHandBase): """富伯机械手客户端""" def __init__(self, init_params=None): if init_params: self.port = init_params["port"] self.__thumb = int(init_params["thumb"]) self.__index_finger = int(init_params["index_finger"]) self.__middle_finger = int(init_params["middle_finger"]) self.__ring_finger = int(init_params["ring_finger"]) self.__little_finger = int(init_params["little_finger"]) self.__duration = int(init_params["duration"]) else: self.port = "COM12" self.__thumb = 60 self.__index_finger = 60 self.__middle_finger = 60 self.__ring_finger = 60 self.__little_finger = 60 self.__duration = 10 self.connector = FuboMechanicalFingerConnector(self.port) def __del__(self): if self.connector is not None: self.connector.close_client() self.connector = None def init(self): ret = self.connector.start_client() return {"is_connected": self.connector.is_connected, "msg": ret} def start(self, train=None): self.connector.extend(self.__thumb, self.__index_finger, self.__middle_finger, self.__ring_finger, self.__little_finger, self.__duration) return 1 def stop(self): self.connector.flex() return 1 def status(self): status = {"is_connected": self.connector.is_connected} return status def close(self): if self.connector: self.connector.close_client() self.connector = None return {"is_connected": False}