''' @Author : liujunshen @Ide : vscode @File : route_peripheral.py @Time : 2023/03/29 16:14:13 ''' from fastapi import APIRouter from fastapi import Body from fastapi import Depends from starlette.responses import JSONResponse from sqlalchemy.orm import Session from core.peripheral.manager import PeripheralHandManager from core.peripheral.hand.fubo_pneumatic_finger import get_serial_ports from db.repository import trains as db_rep_train from db.session import get_db from settings.config import settings language = settings.config["lang"] message_dict = settings.get_message() message = message_dict[language] router = APIRouter() hand_manager = None @router.get("/serial-ports") def get_ports(): available_ports = get_serial_ports() return JSONResponse({"available_ports": available_ports}) @router.post("/hand/init") def hand_init(device_name: str = Body(...), init_params: dict = Body(...)): global hand_manager if hand_manager is not None: hand_manager.close() hand_manager = PeripheralHandManager(device_name, init_params) return JSONResponse(hand_manager.init()) @router.get("/hand/start") def hand_start(train_id: int, db: Session = Depends(get_db)): train = db_rep_train.retrieve_train(train_id, db) if hand_manager is None: return JSONResponse({"msg": message["hand_peripheral_not_init"]}) msg = hand_manager.start(train) return JSONResponse(msg) @router.get("/hand/stop") def hand_stop(): if hand_manager is None: return JSONResponse({"msg": message["hand_peripheral_not_init"]}) msg = hand_manager.stop() return JSONResponse(msg) @router.get("/hand/status") def hand_status(): if hand_manager is None: return JSONResponse({"is_connected": False, "msg": message["hand_peripheral_not_init"]}) msg = hand_manager.status() return JSONResponse(msg) @router.get("/hand/close") def hand_close(): global hand_manager if hand_manager is None: return JSONResponse({"msg": message["hand_peripheral_not_init"]}) msg = hand_manager.close() del hand_manager hand_manager = None return JSONResponse(msg)