''' @Author : liujunshen @Ide : vscode @File : ruishou.py @Time : 2023/03/28 16:54:03 ''' import logging from socket import socket import struct import threading import time from typing import Optional from core.peripheral.hand.base import PeripheralHandBase from settings.config import settings hand_config = settings.config["hand_peripheral_parameter"] logger = logging.getLogger(__name__) def reconnect_decorator(func): """重新连接装饰器""" def inner(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception: self.close() self._start_client() return func(self, *args, **kwargs) return inner class Constants: """睿手相关常量""" CMD_LOCATION = 3 HANDSHAKE_CMD = 0x01 HEARTBEAT_CMD = 0x02 SET_CURRENT_CMD = 0x03 MOTION_CONTROL_CMD = 0x04 DRAFTING_ACTION_CMD = 0x05 FINISH_ACTION_CMD = 0x06 class SendPckLocation: """睿手发送信号功能对于位置""" HANDSHAKE_VERSION_H = 4 HANDSHAKE_VERSION_L = 5 SET_CURRENT_CMD = 4 SET_CURRENT_CHANNEL = 5 SET_CURRENT_VALUE = 6 MOTION_CONTROL_HAND = 4 MOTION_CONTROL_THUMB_BENDING = 5 MOTION_CONTROL_INDEX_FINGER_BENDING = 6 MOTION_CONTROL_MIDDLE_FINGER_BENDING = 7 MOTION_CONTROL_RING_FINGER_BENDING = 8 MOTION_CONTROL_LITTLE_FINGER_BENDING = 9 MOTION_CONTROL_DURATION = 10 DRAFTING_ACTION_HAND = 4 DRAFTING_ACTION_IS_ELECTRIC = 5 DRAFTING_ACTION_CHANNELS = 6 DRAFTING_ACTION_CHANNEL_A_VALUE = 7 DRAFTING_ACTION_CHANNEL_B_VALUE = 8 DRAFTING_ACTION_DURATION = 9 class RecvPckLocation: """睿手接收信号功能对于位置""" HANDSHAKE_STATUS = 4 HANDSHAKE_REASON = 5 SET_CURRENT_CHANNEL = 5 SET_CURRENT_VALUE = 6 MOTION_CONTROL_STATUS = 4 MOTION_CONTROL_DURATION = 5 DRAFTING_ACTION_STATUS = 4 DRAFTING_ACTION_DURATION = 5 FINISH_ACTION_STATUS = 4 FINISH_ACTION_DURATION = 5 class RecvStatus: """睿手接收信号功能响应状态""" HANDSHAKE_SUCCESS = 0x00 HANDSHAKE_FAIL = 0x01 HANDSHAKE_REASON_OTHER = 0x00 HANDSHAKE_REASON_DIFF_VERSION = 0x01 class PckValue: """睿手数据包值""" BOTH_HANDS = 0x01 LEFT_HAND = 0x02 RIGHT_HAND = 0x03 class RuishouConnector: """睿手客户端 功能:连接;发送心跳包;开启线程发送信号;接收信号等 """ def __init__(self) -> None: self.__host = hand_config["hand_host"] self.__port = hand_config["hand_port"] self.__heart_interval = hand_config["hand_heart"] self.__hand_version = hand_config["hand_version"] self.__addr = (self.__host, self.__port) self.protocol = Protocol() self.sock = None self.is_connected = False def start_client(self): """启动客户端""" version_parm = { Constants.SendPckLocation.HANDSHAKE_VERSION_H: self.__hand_version[0], Constants.SendPckLocation.HANDSHAKE_VERSION_L: self.__hand_version[1] } sock = socket() # 链接服务端地址 logger.info("Connecting to and hand peripheral...") self.sock = sock self.sock.connect(self.__addr) logger.info("Hand Peripheral connected successfully.") # TODO: 连接失败的logger self.sync_send_data("handshake", version_parm) self.is_connected = True logger.info("handshake...") # 向服务端发送心跳包 send_heartbeat_t = threading.Thread(target=self.__send_beat_data, args=()) # recv_t.setDaemon(True) # TODO: 目前启动线程接收会导致同步接口无法接受到数据(接收线程已接收), 后续可考虑上锁解决 # recv_t.start() send_heartbeat_t.start() def create_send_t(self, send_data): """外部程序调用""" send_t = threading.Thread(target=self.send_data, args=(send_data,)) send_t.start() send_t.join() def send_data(self, cmd, update=None): if update is None: update = dict() send_data = self.protocol.get_pck(cmd, update) self.sock.sendall(send_data) def sync_send_data(self, cmd, update=None): """同步接口""" if update is None: update = dict() send_data = self.protocol.get_pck(cmd, update) if send_data: self.sock.send(send_data) res = self.filter_recv_msg() return res else: return None def filter_recv_msg(self): times = 0 res = {"msg": "fail"} while times <= 50: recv = self.sock.recv(1024) recv_ls = self.protocol.unpack_bytes(recv) for recv in recv_ls: res = self.protocol.parse_bytes(recv) if res["cmd"] != 2: return res times += 1 return res def __send_beat_data(self): try: while True: self.is_connected = True self.send_data("heartbeat") time.sleep(self.__heart_interval) except Exception: logger.info( "Send beat data failed, Hand Peripheral may be disconnected.") self.is_connected = False def close_client(self): if self.sock: self.sock.close() self.is_connected = False class Protocol: """睿手封装/解析包""" def __init__(self) -> None: self.pck_map = { "handshake": [ 0xAC, 0xAD, 0x05, 0x01, None, None, 0xFF, 0xFF, None, None ], "heartbeat": [ 0xAC, 0xAD, 0x05, 0x02, 0xFF, 0xFF, 0xFF, 0xFF, None, None ], "default_current": [ 0xAC, 0xAD, 0x05, 0x03, None, None, None, 0xFF, None, None ], "motion_control": [ 0xAC, 0xAD, 0x08, 0x04, None, None, None, None, None, None, None, None, None ], "drafting_action": [ 0xAC, 0xAD, 0x07, 0x05, None, None, None, None, None, None, None, None ], "finish_action": [ 0xAC, 0xAD, 0x05, 0x06, 0xFF, 0xFF, 0xFF, 0xFF, None, None ] } # 映射结果命令对应结果文本 self.recv_map = { "handshake": { "byte0": { 0x00: "success", 0x01: "fail" }, "byte1": { 0x00: "other", 0x01: "diff version" } }, "heartbeat": {""} } self.recv_head = (0xAE, 0xAF) # 睿手端数据包头 @staticmethod def cal_checksum(data): b_checksum = sum(data).to_bytes(2, byteorder="big") byte_h, byte_l = struct.unpack("2B", b_checksum) return byte_h, byte_l def get_pck(self, cmd, update_pck=None) -> Optional[bytearray]: """根据命令,参数更新包组装数据包 Args: cmd: 命令 ['handshake', 'heartbeat', 'default_current', 'motion_control', 'drafting_action', 'finish_action'] update_pck: 更新数据:电流参数等 Returns:符合协议的bytes指令 """ if update_pck is None: update_pck = dict() arr = self.pck_map.get(cmd, None) if not arr: return None for key in update_pck.keys(): if key > len(arr): return None arr[key] = update_pck[key] if None in arr[:-2]: return None bs = bytearray(0) byte_h, byte_l = self.cal_checksum(arr[3:-2]) arr[-2], arr[-1] = byte_h, byte_l for i in arr: bs += bytearray(i.to_bytes(1, byteorder="little")) return bs def unpack_bytes(self, recv_bytes) -> list: """将socket.recv数据按包头进行拆包 Args: recv_bytes: 接收的数据包 Returns: 分割后的命令数组 """ recv_list = struct.unpack_from(f"{len(recv_bytes)}B", recv_bytes) res_list = [] for idx in range(len(recv_list) - 1): # 寻找包头 if recv_list[idx] == self.recv_head[0] and (recv_list[idx + 1] == self.recv_head[1]): head_idx = idx data_len = recv_list[head_idx + 2] + 5 res_list.append(recv_list[head_idx:data_len + head_idx]) return res_list def parse_bytes(self, recv_nums) -> dict: """解析封装结果信息 Args: recv_nums: 单个数据包数组 Returns:封装后的结果信息 """ res = dict() res["cmd"] = recv_nums[Constants.CMD_LOCATION] if recv_nums[Constants.CMD_LOCATION] == Constants.HANDSHAKE_CMD: res["status"] = recv_nums[ Constants.RecvPckLocation.HANDSHAKE_STATUS] res["reason"] = recv_nums[ Constants.RecvPckLocation.HANDSHAKE_REASON] elif recv_nums[Constants.CMD_LOCATION] == Constants.SET_CURRENT_CMD: res["current_channel"] = recv_nums[ Constants.RecvPckLocation.SET_CURRENT_CHANNEL] res["current_val"] = recv_nums[ Constants.RecvPckLocation.SET_CURRENT_VALUE] elif recv_nums[Constants.CMD_LOCATION] == Constants.MOTION_CONTROL_CMD: res["motion_control_status"] = recv_nums[ Constants.RecvPckLocation.MOTION_CONTROL_STATUS] res["motion_control_duration"] = recv_nums[ Constants.RecvPckLocation.MOTION_CONTROL_DURATION] elif recv_nums[Constants.CMD_LOCATION] == Constants.DRAFTING_ACTION_CMD: res["drafting_action_status"] = recv_nums[ Constants.RecvPckLocation.DRAFTING_ACTION_STATUS] res["drafting_action_duration"] = recv_nums[ Constants.RecvPckLocation.DRAFTING_ACTION_DURATION] elif recv_nums[Constants.CMD_LOCATION] == Constants.FINISH_ACTION_CMD: res["drafting_action_status"] = recv_nums[ Constants.RecvPckLocation.FINISH_ACTION_STATUS] res["drafting_action_duration"] = recv_nums[ Constants.RecvPckLocation.FINISH_ACTION_DURATION] return res class RuishouClient(PeripheralHandBase): def __init__(self) -> None: self.connector = RuishouConnector() self.version = (0x01, 0x00) # 睿手版本 self.model = None def _start_client(self, version=None): """启动连接""" if self.connector.is_connected: return 1, "already connect" if not version: version = self.version try: self.connector.start_client() return 1, "success connect" except ConnectionRefusedError as e: return 0, f"fail, {e}" def _set_current(self, channel, val): """调节预设电流""" if not self.connector.is_connected: return 0 parm_d = dict() parm_d[Constants.SendPckLocation.SET_CURRENT_CHANNEL] = channel parm_d[Constants.SendPckLocation.SET_CURRENT_VALUE] = val parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 1 # 开始 self.connector.sync_send_data(cmd="default_current", update=parm_d) parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 3 # 调节 self.connector.sync_send_data(cmd="default_current", update=parm_d) parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 2 # 结束 self.connector.sync_send_data(cmd="default_current", update=parm_d) return 1 @staticmethod def _change_hand_data(hand): if hand == "双手": return Constants.PckValue.BOTH_HANDS if hand == "左手": return Constants.PckValue.LEFT_HAND if hand == "右手": return Constants.PckValue.RIGHT_HAND @reconnect_decorator def _control_motion(self, grabbing_param): logger.info("Launch peripheral...") parm_d = { Constants.SendPckLocation.MOTION_CONTROL_HAND: self._change_hand_data(grabbing_param.hand_select), Constants.SendPckLocation.MOTION_CONTROL_THUMB_BENDING: grabbing_param.thumb, Constants.SendPckLocation.MOTION_CONTROL_INDEX_FINGER_BENDING: grabbing_param.index_finger, Constants.SendPckLocation.MOTION_CONTROL_MIDDLE_FINGER_BENDING: grabbing_param.middle_finger, Constants.SendPckLocation.MOTION_CONTROL_RING_FINGER_BENDING: grabbing_param.ring_finger, Constants.SendPckLocation.MOTION_CONTROL_LITTLE_FINGER_BENDING: grabbing_param.little_finger, Constants.SendPckLocation.MOTION_CONTROL_DURATION: grabbing_param.duration } res = self.connector.sync_send_data(cmd="motion_control", update=parm_d) logger.info("Launch peripheral success") return res @reconnect_decorator def _drafting_action(self, drafting_param): if not self.connector.is_connected: return 0 parm_d = { Constants.SendPckLocation.DRAFTING_ACTION_HAND: drafting_param.hand_select, Constants.SendPckLocation.DRAFTING_ACTION_IS_ELECTRIC: drafting_param.is_electric, Constants.SendPckLocation.DRAFTING_ACTION_CHANNELS: drafting_param.draft_channel, Constants.SendPckLocation.DRAFTING_ACTION_CHANNEL_A_VALUE: drafting_param.a_channel_value, Constants.SendPckLocation.DRAFTING_ACTION_CHANNEL_B_VALUE: drafting_param.b_channel_value, Constants.SendPckLocation.DRAFTING_ACTION_DURATION: drafting_param.duration } res = self.connector.sync_send_data(cmd="motion_control", update=parm_d) return res def init(self): _, msg = self._start_client() ret = {"is_connected": self.connector.is_connected, "msg": msg} return ret def set_db_model(self, db_model): self.model = db_model def start(self, train): ret_msg = self._control_motion(train.hand_peripherals) return ret_msg def stop(self): res = self.connector.sync_send_data(cmd="finish_action") return res def status(self): status = {"is_connected": self.connector.is_connected} return status def close(self): self.connector.close_client() ret = {"is_connected": False} return ret