route_peripheral.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. '''
  2. @Author : liujunshen
  3. @Ide : vscode
  4. @File : route_peripheral.py
  5. @Time : 2023/03/29 16:14:13
  6. '''
  7. from fastapi import APIRouter
  8. from fastapi import Body
  9. from fastapi import Depends
  10. from starlette.responses import JSONResponse
  11. from sqlalchemy.orm import Session
  12. from core.peripheral.manager import PeripheralHandManager
  13. from core.peripheral.hand.fubo_pneumatic_finger import get_serial_ports
  14. from db.repository import trains as db_rep_train
  15. from db.session import get_db
  16. from settings.config import settings
  17. language = settings.config["lang"]
  18. message_dict = settings.get_message()
  19. message = message_dict[language]
  20. router = APIRouter()
  21. hand_manager = None
  22. @router.get("/serial-ports")
  23. def get_ports():
  24. available_ports = get_serial_ports()
  25. return JSONResponse({"available_ports": available_ports})
  26. @router.post("/hand/init")
  27. def hand_init(device_name: str = Body(...), init_params: dict = Body(...)):
  28. global hand_manager
  29. if hand_manager is not None:
  30. hand_manager.close()
  31. hand_manager = PeripheralHandManager(device_name, init_params)
  32. return JSONResponse(hand_manager.init())
  33. @router.get("/hand/start")
  34. def hand_start(train_id: int, db: Session = Depends(get_db)):
  35. train = db_rep_train.retrieve_train(train_id, db)
  36. if hand_manager is None:
  37. return JSONResponse({"msg": message["hand_peripheral_not_init"]})
  38. msg = hand_manager.start(train)
  39. return JSONResponse(msg)
  40. @router.get("/hand/stop")
  41. def hand_stop():
  42. if hand_manager is None:
  43. return JSONResponse({"msg": message["hand_peripheral_not_init"]})
  44. msg = hand_manager.stop()
  45. return JSONResponse(msg)
  46. @router.get("/hand/status")
  47. def hand_status():
  48. if hand_manager is None:
  49. return JSONResponse({"is_connected": False, "msg": message["hand_peripheral_not_init"]})
  50. msg = hand_manager.status()
  51. return JSONResponse(msg)
  52. @router.get("/hand/close")
  53. def hand_close():
  54. global hand_manager
  55. if hand_manager is None:
  56. return JSONResponse({"msg": message["hand_peripheral_not_init"]})
  57. msg = hand_manager.close()
  58. del hand_manager
  59. hand_manager = None
  60. return JSONResponse(msg)