123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- '''
- @Author : liujunshen
- @Ide : vscode
- @File : ruishou.py
- @Time : 2023/03/28 16:54:03
- '''
- import logging
- from socket import socket
- import struct
- import threading
- import time
- from typing import Optional
- from core.peripheral.hand.base import PeripheralHandBase
- from settings.config import settings
- hand_config = settings.config["hand_peripheral_parameter"]
- logger = logging.getLogger(__name__)
- def reconnect_decorator(func):
- """重新连接装饰器"""
- def inner(self, *args, **kwargs):
- try:
- return func(self, *args, **kwargs)
- except Exception:
- self.close()
- self._start_client()
- return func(self, *args, **kwargs)
- return inner
- class Constants:
- """睿手相关常量"""
- CMD_LOCATION = 3
- HANDSHAKE_CMD = 0x01
- HEARTBEAT_CMD = 0x02
- SET_CURRENT_CMD = 0x03
- MOTION_CONTROL_CMD = 0x04
- DRAFTING_ACTION_CMD = 0x05
- FINISH_ACTION_CMD = 0x06
- class SendPckLocation:
- """睿手发送信号功能对于位置"""
- HANDSHAKE_VERSION_H = 4
- HANDSHAKE_VERSION_L = 5
- SET_CURRENT_CMD = 4
- SET_CURRENT_CHANNEL = 5
- SET_CURRENT_VALUE = 6
- MOTION_CONTROL_HAND = 4
- MOTION_CONTROL_THUMB_BENDING = 5
- MOTION_CONTROL_INDEX_FINGER_BENDING = 6
- MOTION_CONTROL_MIDDLE_FINGER_BENDING = 7
- MOTION_CONTROL_RING_FINGER_BENDING = 8
- MOTION_CONTROL_LITTLE_FINGER_BENDING = 9
- MOTION_CONTROL_DURATION = 10
- DRAFTING_ACTION_HAND = 4
- DRAFTING_ACTION_IS_ELECTRIC = 5
- DRAFTING_ACTION_CHANNELS = 6
- DRAFTING_ACTION_CHANNEL_A_VALUE = 7
- DRAFTING_ACTION_CHANNEL_B_VALUE = 8
- DRAFTING_ACTION_DURATION = 9
- class RecvPckLocation:
- """睿手接收信号功能对于位置"""
- HANDSHAKE_STATUS = 4
- HANDSHAKE_REASON = 5
- SET_CURRENT_CHANNEL = 5
- SET_CURRENT_VALUE = 6
- MOTION_CONTROL_STATUS = 4
- MOTION_CONTROL_DURATION = 5
- DRAFTING_ACTION_STATUS = 4
- DRAFTING_ACTION_DURATION = 5
- FINISH_ACTION_STATUS = 4
- FINISH_ACTION_DURATION = 5
- class RecvStatus:
- """睿手接收信号功能响应状态"""
- HANDSHAKE_SUCCESS = 0x00
- HANDSHAKE_FAIL = 0x01
- HANDSHAKE_REASON_OTHER = 0x00
- HANDSHAKE_REASON_DIFF_VERSION = 0x01
- class PckValue:
- """睿手数据包值"""
- BOTH_HANDS = 0x01
- LEFT_HAND = 0x02
- RIGHT_HAND = 0x03
- class RuishouConnector:
- """睿手客户端
- 功能:连接;发送心跳包;开启线程发送信号;接收信号等
- """
- def __init__(self) -> None:
- self.__host = hand_config["hand_host"]
- self.__port = hand_config["hand_port"]
- self.__heart_interval = hand_config["hand_heart"]
- self.__hand_version = hand_config["hand_version"]
- self.__addr = (self.__host, self.__port)
- self.protocol = Protocol()
- self.sock = None
- self.is_connected = False
- def start_client(self):
- """启动客户端"""
- version_parm = {
- Constants.SendPckLocation.HANDSHAKE_VERSION_H:
- self.__hand_version[0],
- Constants.SendPckLocation.HANDSHAKE_VERSION_L:
- self.__hand_version[1]
- }
- sock = socket()
- # 链接服务端地址
- logger.info("Connecting to and hand peripheral...")
- self.sock = sock
- self.sock.connect(self.__addr)
- logger.info("Hand Peripheral connected successfully.")
- # TODO: 连接失败的logger
- self.sync_send_data("handshake", version_parm)
- self.is_connected = True
- logger.info("handshake...")
- # 向服务端发送心跳包
- send_heartbeat_t = threading.Thread(target=self.__send_beat_data,
- args=())
- # recv_t.setDaemon(True)
- # TODO: 目前启动线程接收会导致同步接口无法接受到数据(接收线程已接收), 后续可考虑上锁解决
- # recv_t.start()
- send_heartbeat_t.start()
- def create_send_t(self, send_data):
- """外部程序调用"""
- send_t = threading.Thread(target=self.send_data, args=(send_data,))
- send_t.start()
- send_t.join()
- def send_data(self, cmd, update=None):
- if update is None:
- update = dict()
- send_data = self.protocol.get_pck(cmd, update)
- self.sock.sendall(send_data)
- def sync_send_data(self, cmd, update=None):
- """同步接口"""
- if update is None:
- update = dict()
- send_data = self.protocol.get_pck(cmd, update)
- if send_data:
- self.sock.send(send_data)
- res = self.filter_recv_msg()
- return res
- else:
- return None
- def filter_recv_msg(self):
- times = 0
- res = {"msg": "fail"}
- while times <= 50:
- recv = self.sock.recv(1024)
- recv_ls = self.protocol.unpack_bytes(recv)
- for recv in recv_ls:
- res = self.protocol.parse_bytes(recv)
- if res["cmd"] != 2:
- return res
- times += 1
- return res
- def __send_beat_data(self):
- try:
- while True:
- self.is_connected = True
- self.send_data("heartbeat")
- time.sleep(self.__heart_interval)
- except Exception:
- logger.info(
- "Send beat data failed, Hand Peripheral may be disconnected.")
- self.is_connected = False
- def close_client(self):
- if self.sock:
- self.sock.close()
- self.is_connected = False
- class Protocol:
- """睿手封装/解析包"""
- def __init__(self) -> None:
- self.pck_map = {
- "handshake": [
- 0xAC, 0xAD, 0x05, 0x01, None, None, 0xFF, 0xFF, None, None
- ],
- "heartbeat": [
- 0xAC, 0xAD, 0x05, 0x02, 0xFF, 0xFF, 0xFF, 0xFF, None, None
- ],
- "default_current": [
- 0xAC, 0xAD, 0x05, 0x03, None, None, None, 0xFF, None, None
- ],
- "motion_control": [
- 0xAC, 0xAD, 0x08, 0x04, None, None, None, None, None, None,
- None, None, None
- ],
- "drafting_action": [
- 0xAC, 0xAD, 0x07, 0x05, None, None, None, None, None, None,
- None, None
- ],
- "finish_action": [
- 0xAC, 0xAD, 0x05, 0x06, 0xFF, 0xFF, 0xFF, 0xFF, None, None
- ]
- }
- # 映射结果命令对应结果文本
- self.recv_map = {
- "handshake": {
- "byte0": {
- 0x00: "success",
- 0x01: "fail"
- },
- "byte1": {
- 0x00: "other",
- 0x01: "diff version"
- }
- },
- "heartbeat": {""}
- }
- self.recv_head = (0xAE, 0xAF) # 睿手端数据包头
- @staticmethod
- def cal_checksum(data):
- b_checksum = sum(data).to_bytes(2, byteorder="big")
- byte_h, byte_l = struct.unpack("2B", b_checksum)
- return byte_h, byte_l
- def get_pck(self, cmd, update_pck=None) -> Optional[bytearray]:
- """根据命令,参数更新包组装数据包
- Args:
- cmd: 命令 ['handshake', 'heartbeat', 'default_current',
- 'motion_control', 'drafting_action', 'finish_action']
- update_pck: 更新数据:电流参数等
- Returns:符合协议的bytes指令
- """
- if update_pck is None:
- update_pck = dict()
- arr = self.pck_map.get(cmd, None)
- if not arr:
- return None
- for key in update_pck.keys():
- if key > len(arr):
- return None
- arr[key] = update_pck[key]
- if None in arr[:-2]:
- return None
- bs = bytearray(0)
- byte_h, byte_l = self.cal_checksum(arr[3:-2])
- arr[-2], arr[-1] = byte_h, byte_l
- for i in arr:
- bs += bytearray(i.to_bytes(1, byteorder="little"))
- return bs
- def unpack_bytes(self, recv_bytes) -> list:
- """将socket.recv数据按包头进行拆包
- Args:
- recv_bytes: 接收的数据包
- Returns: 分割后的命令数组
- """
- recv_list = struct.unpack_from(f"{len(recv_bytes)}B", recv_bytes)
- res_list = []
- for idx in range(len(recv_list) - 1):
- # 寻找包头
- if recv_list[idx] == self.recv_head[0] and (recv_list[idx + 1]
- == self.recv_head[1]):
- head_idx = idx
- data_len = recv_list[head_idx + 2] + 5
- res_list.append(recv_list[head_idx:data_len + head_idx])
- return res_list
- def parse_bytes(self, recv_nums) -> dict:
- """解析封装结果信息
- Args:
- recv_nums: 单个数据包数组
- Returns:封装后的结果信息
- """
- res = dict()
- res["cmd"] = recv_nums[Constants.CMD_LOCATION]
- if recv_nums[Constants.CMD_LOCATION] == Constants.HANDSHAKE_CMD:
- res["status"] = recv_nums[
- Constants.RecvPckLocation.HANDSHAKE_STATUS]
- res["reason"] = recv_nums[
- Constants.RecvPckLocation.HANDSHAKE_REASON]
- elif recv_nums[Constants.CMD_LOCATION] == Constants.SET_CURRENT_CMD:
- res["current_channel"] = recv_nums[
- Constants.RecvPckLocation.SET_CURRENT_CHANNEL]
- res["current_val"] = recv_nums[
- Constants.RecvPckLocation.SET_CURRENT_VALUE]
- elif recv_nums[Constants.CMD_LOCATION] == Constants.MOTION_CONTROL_CMD:
- res["motion_control_status"] = recv_nums[
- Constants.RecvPckLocation.MOTION_CONTROL_STATUS]
- res["motion_control_duration"] = recv_nums[
- Constants.RecvPckLocation.MOTION_CONTROL_DURATION]
- elif recv_nums[Constants.CMD_LOCATION] == Constants.DRAFTING_ACTION_CMD:
- res["drafting_action_status"] = recv_nums[
- Constants.RecvPckLocation.DRAFTING_ACTION_STATUS]
- res["drafting_action_duration"] = recv_nums[
- Constants.RecvPckLocation.DRAFTING_ACTION_DURATION]
- elif recv_nums[Constants.CMD_LOCATION] == Constants.FINISH_ACTION_CMD:
- res["drafting_action_status"] = recv_nums[
- Constants.RecvPckLocation.FINISH_ACTION_STATUS]
- res["drafting_action_duration"] = recv_nums[
- Constants.RecvPckLocation.FINISH_ACTION_DURATION]
- return res
- class RuishouClient(PeripheralHandBase):
- def __init__(self) -> None:
- self.connector = RuishouConnector()
- self.version = (0x01, 0x00) # 睿手版本
- self.model = None
- def _start_client(self, version=None):
- """启动连接"""
- if self.connector.is_connected:
- return 1, "already connect"
- if not version:
- version = self.version
- try:
- self.connector.start_client()
- return 1, "success connect"
- except ConnectionRefusedError as e:
- return 0, f"fail, {e}"
- def _set_current(self, channel, val):
- """调节预设电流"""
- if not self.connector.is_connected:
- return 0
- parm_d = dict()
- parm_d[Constants.SendPckLocation.SET_CURRENT_CHANNEL] = channel
- parm_d[Constants.SendPckLocation.SET_CURRENT_VALUE] = val
- parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 1 # 开始
- self.connector.sync_send_data(cmd="default_current", update=parm_d)
- parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 3 # 调节
- self.connector.sync_send_data(cmd="default_current", update=parm_d)
- parm_d[Constants.SendPckLocation.SET_CURRENT_CMD] = 2 # 结束
- self.connector.sync_send_data(cmd="default_current", update=parm_d)
- return 1
- @staticmethod
- def _change_hand_data(hand):
- if hand == "双手":
- return Constants.PckValue.BOTH_HANDS
- if hand == "左手":
- return Constants.PckValue.LEFT_HAND
- if hand == "右手":
- return Constants.PckValue.RIGHT_HAND
- @reconnect_decorator
- def _control_motion(self, grabbing_param):
- logger.info("Launch peripheral...")
- parm_d = {
- Constants.SendPckLocation.MOTION_CONTROL_HAND:
- self._change_hand_data(grabbing_param.hand_select),
- Constants.SendPckLocation.MOTION_CONTROL_THUMB_BENDING:
- grabbing_param.thumb,
- Constants.SendPckLocation.MOTION_CONTROL_INDEX_FINGER_BENDING:
- grabbing_param.index_finger,
- Constants.SendPckLocation.MOTION_CONTROL_MIDDLE_FINGER_BENDING:
- grabbing_param.middle_finger,
- Constants.SendPckLocation.MOTION_CONTROL_RING_FINGER_BENDING:
- grabbing_param.ring_finger,
- Constants.SendPckLocation.MOTION_CONTROL_LITTLE_FINGER_BENDING:
- grabbing_param.little_finger,
- Constants.SendPckLocation.MOTION_CONTROL_DURATION:
- grabbing_param.duration
- }
- res = self.connector.sync_send_data(cmd="motion_control", update=parm_d)
- logger.info("Launch peripheral success")
- return res
- @reconnect_decorator
- def _drafting_action(self, drafting_param):
- if not self.connector.is_connected:
- return 0
- parm_d = {
- Constants.SendPckLocation.DRAFTING_ACTION_HAND:
- drafting_param.hand_select,
- Constants.SendPckLocation.DRAFTING_ACTION_IS_ELECTRIC:
- drafting_param.is_electric,
- Constants.SendPckLocation.DRAFTING_ACTION_CHANNELS:
- drafting_param.draft_channel,
- Constants.SendPckLocation.DRAFTING_ACTION_CHANNEL_A_VALUE:
- drafting_param.a_channel_value,
- Constants.SendPckLocation.DRAFTING_ACTION_CHANNEL_B_VALUE:
- drafting_param.b_channel_value,
- Constants.SendPckLocation.DRAFTING_ACTION_DURATION:
- drafting_param.duration
- }
- res = self.connector.sync_send_data(cmd="motion_control", update=parm_d)
- return res
- def init(self):
- _, msg = self._start_client()
- ret = {"is_connected": self.connector.is_connected, "msg": msg}
- return ret
- def set_db_model(self, db_model):
- self.model = db_model
- def start(self, train):
- ret_msg = self._control_motion(train.hand_peripherals)
- return ret_msg
- def stop(self):
- res = self.connector.sync_send_data(cmd="finish_action")
- return res
- def status(self):
- status = {"is_connected": self.connector.is_connected}
- return status
- def close(self):
- self.connector.close_client()
- ret = {"is_connected": False}
- return ret
|