""" app.py — Robot Tracker v009 - Encoder-PID: Server schickt mm/s statt PWM - Dead Reckoning: Position schätzen wenn Tag unsichtbar - Sanfteres Drehen mit Encoder-Feedback - NavLogger mit Geschwindigkeitsdaten """ import asyncio import json import logging import math import time from pathlib import Path import yaml from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from tracker import TagTracker from calibration import Calibration from collision import compute_avoidance, force_to_speeds, reset_robot from navlogger import NavLogger logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("app") _cfg_path = Path(__file__).parent / "config.yaml" with open(_cfg_path) as f: cfg = yaml.safe_load(f) server_cfg = cfg["server"] robot_cfg = cfg["robots"] cal_cfg = cfg["calibration"] obs_cfg = cfg["obstacles"] hw_cfg = cfg["robot_hardware"] broadcast_interval = 1.0 / server_cfg.get("broadcast_hz", 30) ROBOT_TAG_IDS = robot_cfg["robot_tag_ids"] GOAL_RADIUS = robot_cfg["goal_radius"] SPEED_BASE = robot_cfg["speed_base"] SPEED_TURN = robot_cfg["speed_turn"] SPEED_MIN = robot_cfg["speed_min"] SPEED_MAX = robot_cfg["speed_max"] ANGLE_THRESHOLD = robot_cfg["angle_threshold"] TCP_PORT = robot_cfg["tcp_port"] DR_TIMEOUT = robot_cfg["dead_reckoning_timeout"] SAFE_DISTANCE = obs_cfg["safe_distance"] REPULSION = obs_cfg["repulsion_strength"] OBSTACLE_IDS = set(obs_cfg["obstacle_tag_ids"]) REFERENCE_IDS = set(int(k) for k in cal_cfg["reference_tags"].keys()) MM_PER_PULSE = hw_cfg["mm_per_pulse"] AXLE_WIDTH = hw_cfg["axle_width_mm"] STUCK_TIMEOUT = 1.5 STUCK_THRESHOLD = 15 robots: dict = {} # robot_id → {"writer": ..., "enc_l": 0, "enc_r": 0, "speed_l": 0, "speed_r": 0} goal: dict = {} calibration = Calibration(cal_cfg["reference_tags"]) nav_logger = NavLogger("logs") # Dead Reckoning Zustand robot_dr: dict = {} # robot_id → {"x": mm, "y": mm, "theta": deg, "enc_l": int, "enc_r": int, "time": float} robot_last_pos: dict = {} app = FastAPI(title="Robot Tracker v009") app.mount("/static", StaticFiles(directory="static"), name="static") tracker = TagTracker(cfg) browser_clients: set[WebSocket] = set() # --------------------------------------------------------------------------- # TCP Robot-Server # --------------------------------------------------------------------------- async def handle_robot(reader, writer) -> None: robot_id = None addr = writer.get_extra_info("peername") logger.info(f"TCP Verbindung von {addr}") try: while True: line = await reader.readline() if not line: break data = json.loads(line.decode().strip()) if robot_id is None: robot_id = data.get("robot_id") robots[robot_id] = { "writer": writer, "enc_l": 0, "enc_r": 0, "speed_l": 0.0, "speed_r": 0.0, "pwm_l": 0, "pwm_r": 0, "last_enc_l": 0, "last_enc_r": 0, "last_telem_time": time.time(), } logger.info(f"Roboter {robot_id} verbunden (v={data.get('version','?')})") if "enc_l" in data and robot_id: r = robots[robot_id] r["enc_l"] = data["enc_l"] r["enc_r"] = data["enc_r"] r["speed_l"] = data.get("speed_l", 0.0) r["speed_r"] = data.get("speed_r", 0.0) r["pwm_l"] = data.get("pwm_l", 0) r["pwm_r"] = data.get("pwm_r", 0) r["last_telem_time"] = time.time() # Dead Reckoning aktualisieren _update_dead_reckoning(robot_id, data) except Exception as e: logger.warning(f"Roboter {robot_id} Fehler: {e}") finally: if robot_id and robot_id in robots: del robots[robot_id] reset_robot(robot_id) # Kraft-Glättungs-Zustand zurücksetzen writer.close() logger.info(f"Roboter {robot_id} getrennt") def _update_dead_reckoning(robot_id, telem: dict): """Aktualisiert Dead-Reckoning Position aus Encoder-Telemetrie.""" if robot_id not in robot_dr: return dr = robot_dr[robot_id] new_enc_l = telem["enc_l"] new_enc_r = telem["enc_r"] delta_l = (new_enc_l - dr["enc_l"]) * MM_PER_PULSE delta_r = (new_enc_r - dr["enc_r"]) * MM_PER_PULSE dr["enc_l"] = new_enc_l dr["enc_r"] = new_enc_r # Richtung aus letztem Befehl if robots.get(robot_id, {}).get("speed_l", 0) < 0: delta_l = -delta_l if robots.get(robot_id, {}).get("speed_r", 0) < 0: delta_r = -delta_r dist = (delta_l + delta_r) / 2.0 d_theta = math.degrees((delta_r - delta_l) / AXLE_WIDTH) theta_rad = math.radians(dr["theta"] + d_theta / 2) dr["x"] += dist * math.sin(theta_rad) dr["y"] += dist * -math.cos(theta_rad) dr["theta"] = (dr["theta"] + d_theta + 180) % 360 - 180 dr["time"] = time.time() async def send_to_robot(robot_id: int, cmd: dict) -> None: r = robots.get(robot_id) if r: try: msg = json.dumps({ "speed_l": round(cmd["speed_l"], 1), "speed_r": round(cmd["speed_r"], 1), }) + "\n" r["writer"].write(msg.encode()) await r["writer"].drain() except Exception as e: logger.warning(f"Senden an Roboter {robot_id} fehlgeschlagen: {e}") # --------------------------------------------------------------------------- # Regler-Loop # --------------------------------------------------------------------------- def check_stuck(robot_id, rx, ry) -> bool: now = time.time() if robot_id not in robot_last_pos: robot_last_pos[robot_id] = (rx, ry, now) return False lx, ly, lt = robot_last_pos[robot_id] dist_moved = math.sqrt((rx - lx)**2 + (ry - ly)**2) if dist_moved > STUCK_THRESHOLD: robot_last_pos[robot_id] = (rx, ry, now) return False if now - lt > STUCK_TIMEOUT: logger.warning(f"Roboter {robot_id} steckt fest!") robot_last_pos[robot_id] = (rx, ry, now) return True return False async def controller_loop() -> None: while True: await asyncio.sleep(0.05) # 20 Hz statt 10 Hz — schnellere Reaktion if not goal: continue for robot_tag_id in ROBOT_TAG_IDS: if robot_tag_id not in robots: continue # Position bestimmen — Kamera oder Dead Reckoning tag_state = tracker.state.get(robot_tag_id) use_dr = False if tag_state: age = time.time() - tag_state.get("last_meas", 0) if age > DR_TIMEOUT and robot_tag_id in robot_dr: use_dr = True dr = robot_dr[robot_tag_id] rx, ry, rtheta = dr["x"], dr["y"], dr["theta"] logger.debug(f"Dead Reckoning für Roboter {robot_tag_id}") else: if calibration.calibrated: rx, ry = calibration.pixel_to_world(tag_state["x"], tag_state["y"]) else: rx, ry = tag_state["x"], tag_state["y"] rtheta = tag_state["theta"] # Dead Reckoning mit Kamera-Position initialisieren robot_dr[robot_tag_id] = { "x": rx, "y": ry, "theta": rtheta, "enc_l": robots[robot_tag_id]["enc_l"], "enc_r": robots[robot_tag_id]["enc_r"], "time": time.time(), } else: if robot_tag_id in robot_dr: use_dr = True dr = robot_dr[robot_tag_id] rx, ry, rtheta = dr["x"], dr["y"], dr["theta"] else: continue # Zieldistanz dx = goal["x"] - rx dy = goal["y"] - ry dist = math.sqrt(dx**2 + dy**2) if dist < GOAL_RADIUS: logger.info(f"Roboter {robot_tag_id} Ziel erreicht! {'(DR)' if use_dr else ''}") goal.clear() robot_last_pos.pop(robot_tag_id, None) await send_to_robot(robot_tag_id, {"speed_l": 0.0, "speed_r": 0.0}) continue # Anti-Stecken if check_stuck(robot_tag_id, rx, ry): await send_to_robot(robot_tag_id, {"speed_l": -80.0, "speed_r": -80.0}) await asyncio.sleep(0.4) continue # Hindernisse sammeln obstacles = [] for tid, state in tracker.state.items(): if tid == robot_tag_id or tid in REFERENCE_IDS or tid not in OBSTACLE_IDS: continue if calibration.calibrated: ox, oy = calibration.pixel_to_world(state["x"], state["y"]) else: ox, oy = state["x"], state["y"] obstacles.append({"id": tid, "x": ox, "y": oy}) # Potentialfeld fx, fy = compute_avoidance(rx, ry, goal["x"], goal["y"], obstacles, SAFE_DISTANCE, REPULSION) # Geschwindigkeitsbefehl cmd = force_to_speeds( fx, fy, rtheta, SPEED_BASE, SPEED_TURN, SPEED_MIN, SPEED_MAX, ANGLE_THRESHOLD, AXLE_WIDTH, robot_id=robot_tag_id, dist_to_goal=dist, ) # Logging target_angle = math.degrees(math.atan2(fx, -fy)) angle_error = (target_angle - rtheta + 180) % 360 - 180 nav_logger.log(rx, ry, rtheta, goal["x"], goal["y"], cmd, obstacles, angle_error) await send_to_robot(robot_tag_id, cmd) # --------------------------------------------------------------------------- # Lifecycle # --------------------------------------------------------------------------- @app.on_event("startup") async def startup() -> None: loop = asyncio.get_running_loop() tracker.start(loop) tcp_server = await asyncio.start_server(handle_robot, "0.0.0.0", TCP_PORT) asyncio.create_task(tcp_server.serve_forever(), name="tcp-server") asyncio.create_task(broadcast_loop(), name="broadcast") asyncio.create_task(controller_loop(), name="controller") logger.info("Server v009 gestartet — Encoder-PID aktiv") @app.on_event("shutdown") async def shutdown() -> None: tracker.stop() nav_logger.close() logger.info("Server gestoppt") # --------------------------------------------------------------------------- # Broadcast # --------------------------------------------------------------------------- async def broadcast_loop() -> None: while True: tags = await tracker.get_tags() if not calibration.calibrated: calibration.update(tags) tags_out = [] obstacles_out = [] for t in tags: t2 = dict(t) if calibration.calibrated and t["id"] not in REFERENCE_IDS: x_mm, y_mm = calibration.pixel_to_world(t["x"], t["y"]) t2["x_mm"] = round(x_mm, 1) t2["y_mm"] = round(y_mm, 1) # Geschwindigkeit aus Roboter-Telemetrie hinzufügen if t["id"] in robots: r = robots[t["id"]] t2["speed_actual_l"] = round(r.get("speed_l", 0), 1) t2["speed_actual_r"] = round(r.get("speed_r", 0), 1) t2["speed_mm_s"] = round( (abs(r.get("speed_l", 0)) + abs(r.get("speed_r", 0))) / 2, 1 ) tags_out.append(t2) if t["id"] in OBSTACLE_IDS: obstacles_out.append({ "id": t["id"], "x": t["x"], "y": t["y"], "x_mm": t2.get("x_mm"), "y_mm": t2.get("y_mm"), }) payload = { "tags": tags_out, "goal": goal if goal else None, "calibrated": calibration.calibrated, "robots_connected": list(robots.keys()), "field_corners_px": calibration.get_field_corners_px(), "obstacles": obstacles_out, } dead: set[WebSocket] = set() for ws in list(browser_clients): try: await ws.send_json(payload) except Exception: dead.add(ws) browser_clients.difference_update(dead) await asyncio.sleep(broadcast_interval) # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/") def wizard(): return FileResponse("static/wizard.html") @app.get("/view") def view(): return FileResponse("static/view.html") @app.websocket("/ws") async def ws_endpoint(ws: WebSocket) -> None: await ws.accept() browser_clients.add(ws) logger.info(f"Browser verbunden ({len(browser_clients)} gesamt)") try: while True: msg = await ws.receive_text() data = json.loads(msg) if data.get("type") == "goal": gx, gy = data["x"], data["y"] if calibration.calibrated: gx, gy = calibration.pixel_to_world(gx, gy) goal["x"] = gx goal["y"] = gy logger.info(f"Neues Ziel: x={gx:.1f} y={gy:.1f} {'mm' if calibration.calibrated else 'px'}") elif data.get("type") == "calibrate": tag_list = [{"id": tid, "x": s["x"], "y": s["y"]} for tid, s in tracker.state.items()] success = calibration.update(tag_list) await ws.send_json({"type": "calibration_result", "success": success}) except (WebSocketDisconnect, Exception): browser_clients.discard(ws) logger.info(f"Browser getrennt ({len(browser_clients)} verbleibend)")