""" tracker.py — Robot Tracker v005 Kamera-Capture und AprilTag-Erkennung laufen in einem dedizierten Hintergrund-Thread. Der asyncio Event-Loop wird nie blockiert. Ein Watchdog-Loop startet ffmpeg bei Verbindungsabbruch automatisch neu. """ import asyncio import logging import math import subprocess import threading import time from typing import Optional import cv2 import numpy as np from pupil_apriltags import Detector logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def ema(a: float, b: float, alpha: float) -> float: return a + alpha * (b - a) def ema_angle_deg(prev_deg: float, new_deg: float, alpha: float) -> float: """EMA über den Einheitskreis — verhindert Sprung bei ±180°.""" px = math.cos(math.radians(prev_deg)) py = math.sin(math.radians(prev_deg)) nx = math.cos(math.radians(new_deg)) ny = math.sin(math.radians(new_deg)) return math.degrees(math.atan2(ema(py, ny, alpha), ema(px, nx, alpha))) def wrap_angle_deg(a: float) -> float: return (a + 180.0) % 360.0 - 180.0 # --------------------------------------------------------------------------- # TagTracker # --------------------------------------------------------------------------- class TagTracker: """ Liest Frames von einem RTSP-Stream via ffmpeg-Subprocess, erkennt AprilTags und pflegt einen geglätteten Zustandsvektor pro Tag (Position, Winkel, Geschwindigkeit). Der gesamte I/O läuft in einem eigenen Thread. Der Event-Loop holt Ergebnisse aus einer asyncio.Queue (maxsize=1), d.h. er bekommt immer das neueste verfügbare Ergebnis. """ def __init__(self, cfg: dict): cam = cfg["camera"] trk = cfg["tracker"] self.rtsp_url = cam["rtsp_url"] self.width = cam["width"] self.height = cam["height"] self.rtsp_transport = cam.get("rtsp_transport", "udp") self.reconnect_delay = cam.get("reconnect_delay", 3.0) self.max_attempts = cam.get("reconnect_max_attempts", 0) self.frame_bytes = self.width * self.height * 3 self.alpha_pos = trk["ema_alpha_pos"] self.alpha_ang = trk["ema_alpha_ang"] self.alpha_vel = trk["ema_alpha_vel"] self.hold_seconds = trk["hold_seconds"] self.pred_seconds = trk["predict_seconds"] self.detector = Detector( families = trk["tag_family"], nthreads = trk["detector_threads"], quad_decimate = trk["quad_decimate"], quad_sigma = trk["quad_sigma"], refine_edges = trk["refine_edges"], decode_sharpening= trk["decode_sharpening"], ) self.state: dict = {} # Queue zwischen Producer-Thread und asyncio-Consumer # maxsize=1: der Consumer bekommt immer das neueste Frame, # ältere werden verworfen. self._queue: Optional[asyncio.Queue] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._ffmpeg: Optional[subprocess.Popen] = None # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def start(self, loop: asyncio.AbstractEventLoop) -> None: """Startet den Producer-Thread. Muss mit dem laufenden Event-Loop aufgerufen werden.""" self._loop = loop self._queue = asyncio.Queue(maxsize=1) self._stop_event.clear() self._thread = threading.Thread(target=self._producer_loop, daemon=True, name="tracker-producer") self._thread.start() logger.info("TagTracker gestartet") def stop(self) -> None: """Stoppt den Producer-Thread und beendet ffmpeg sauber.""" self._stop_event.set() self._kill_ffmpeg() if self._thread: self._thread.join(timeout=5.0) logger.info("TagTracker gestoppt") # ------------------------------------------------------------------ # Async Interface für den Event-Loop # ------------------------------------------------------------------ async def get_tags(self) -> list: """ Wartet auf das nächste Ergebnis aus dem Producer-Thread. Gibt eine leere Liste zurück wenn innerhalb von 0.5 s nichts kommt (z.B. während eines Reconnects). """ if self._queue is None: return [] try: return await asyncio.wait_for(self._queue.get(), timeout=0.5) except asyncio.TimeoutError: return [] # ------------------------------------------------------------------ # Producer-Thread (läuft komplett außerhalb des Event-Loops) # ------------------------------------------------------------------ def _producer_loop(self) -> None: """ Watchdog-Loop: startet ffmpeg, liest Frames, erkennt Tags, stellt Ergebnis in die Queue. Startet ffmpeg bei Fehler neu. """ attempts = 0 while not self._stop_event.is_set(): attempts += 1 if self.max_attempts > 0 and attempts > self.max_attempts: logger.error("Maximale Reconnect-Versuche erreicht. Producer stoppt.") break logger.info(f"ffmpeg starten (Versuch {attempts})… URL: {self.rtsp_url}") proc = self._spawn_ffmpeg() if proc is None: self._stop_event.wait(self.reconnect_delay) continue self._ffmpeg = proc read_errors = 0 try: while not self._stop_event.is_set(): raw = proc.stdout.read(self.frame_bytes) if len(raw) != self.frame_bytes: read_errors += 1 if read_errors >= 3: logger.warning("Zu viele Frame-Lesefehler — Reconnect") break continue read_errors = 0 tags = self._process_frame(raw) self._enqueue(tags) except Exception as e: logger.error(f"Fehler im Producer-Thread: {e}") finally: self._kill_ffmpeg() if not self._stop_event.is_set(): logger.info(f"Reconnect in {self.reconnect_delay:.1f}s…") self._stop_event.wait(self.reconnect_delay) def _spawn_ffmpeg(self) -> Optional[subprocess.Popen]: cmd = [ "ffmpeg", "-rtsp_transport", self.rtsp_transport, "-fflags", "nobuffer", "-flags", "low_delay", "-probesize", "32", "-analyzeduration","0", "-vsync", "0", "-i", self.rtsp_url, "-an", "-sn", "-dn", "-pix_fmt", "bgr24", "-f", "rawvideo", "-", ] try: return subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=self.frame_bytes, ) except FileNotFoundError: logger.error("ffmpeg nicht gefunden — bitte installieren") return None except Exception as e: logger.error(f"ffmpeg-Start fehlgeschlagen: {e}") return None def _kill_ffmpeg(self) -> None: proc = self._ffmpeg if proc and proc.poll() is None: try: proc.terminate() proc.wait(timeout=3.0) except Exception: proc.kill() self._ffmpeg = None def _enqueue(self, tags: list) -> None: """Stellt Tags in die Queue. Älteres Ergebnis wird verworfen wenn voll.""" if self._queue is None or self._loop is None: return # Aus dem Thread heraus: Thread-safe über call_soon_threadsafe self._loop.call_soon_threadsafe(self._queue_put_nowait, tags) def _queue_put_nowait(self, tags: list) -> None: """Läuft im Event-Loop-Thread. Tauscht älteres Ergebnis aus.""" if self._queue is None: return if self._queue.full(): try: self._queue.get_nowait() except asyncio.QueueEmpty: pass try: self._queue.put_nowait(tags) except asyncio.QueueFull: pass # ------------------------------------------------------------------ # Frame-Verarbeitung (läuft im Producer-Thread) # ------------------------------------------------------------------ def _process_frame(self, raw: bytes) -> list: frame = np.frombuffer(raw, dtype=np.uint8).reshape( (self.height, self.width, 3) ) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) detections = self.detector.detect(gray) now = time.time() for d in detections: tag_id = int(d.tag_id) cx, cy = float(d.center[0]), float(d.center[1]) p0, p1 = d.corners[0], d.corners[1] theta_meas = float(np.degrees( np.arctan2(p1[0] - p0[0], -(p1[1] - p0[1])) )) if tag_id not in self.state: self.state[tag_id] = { "x": cx, "y": cy, "theta": theta_meas, "vx": 0.0, "vy": 0.0, "omega": 0.0, "last_meas": now, "last_pose_ts": now, "lx": cx, "ly": cy, "lt": theta_meas, } else: s = self.state[tag_id] x = ema(s["x"], cx, self.alpha_pos) y = ema(s["y"], cy, self.alpha_pos) th = ema_angle_deg(s["theta"], theta_meas, self.alpha_ang) dt = now - s["last_pose_ts"] if dt > 1e-3: s["vx"] = ema(s["vx"], (x - s["lx"]) / dt, self.alpha_vel) s["vy"] = ema(s["vy"], (y - s["ly"]) / dt, self.alpha_vel) s["omega"] = ema(s["omega"], wrap_angle_deg(th - s["lt"]) / dt, self.alpha_vel) s.update({ "x": x, "y": y, "theta": th, "last_meas": now, "last_pose_ts": now, "lx": x, "ly": y, "lt": th, }) out = [] to_del = [] for tag_id, s in self.state.items(): age = now - s["last_meas"] if age <= self.hold_seconds: if age <= self.pred_seconds: x = s["x"] + s["vx"] * age y = s["y"] + s["vy"] * age th = wrap_angle_deg(s["theta"] + s["omega"] * age) else: x, y, th = s["x"], s["y"], s["theta"] out.append({ "id": tag_id, "x": round(x, 2), "y": round(y, 2), "theta": round(th, 2), "age": round(age, 3), "vx": round(s["vx"], 2), "vy": round(s["vy"], 2), "omega": round(s["omega"], 2), }) else: to_del.append(tag_id) for tag_id in to_del: del self.state[tag_id] return out