HoloDeck_Robot_System/tracker.py

318 lines
11 KiB
Python

"""
tracker.py — Robot Tracker v005
Kamera-Capture und AprilTag-Erkennung laufen in einem dedizierten
Hintergrund-Thread. Der asyncio Event-Loop wird nie blockiert.
Ein Watchdog-Loop startet ffmpeg bei Verbindungsabbruch automatisch neu.
"""
import asyncio
import logging
import math
import subprocess
import threading
import time
from typing import Optional
import cv2
import numpy as np
from pupil_apriltags import Detector
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def ema(a: float, b: float, alpha: float) -> float:
return a + alpha * (b - a)
def ema_angle_deg(prev_deg: float, new_deg: float, alpha: float) -> float:
"""EMA über den Einheitskreis — verhindert Sprung bei ±180°."""
px = math.cos(math.radians(prev_deg))
py = math.sin(math.radians(prev_deg))
nx = math.cos(math.radians(new_deg))
ny = math.sin(math.radians(new_deg))
return math.degrees(math.atan2(ema(py, ny, alpha), ema(px, nx, alpha)))
def wrap_angle_deg(a: float) -> float:
return (a + 180.0) % 360.0 - 180.0
# ---------------------------------------------------------------------------
# TagTracker
# ---------------------------------------------------------------------------
class TagTracker:
"""
Liest Frames von einem RTSP-Stream via ffmpeg-Subprocess,
erkennt AprilTags und pflegt einen geglätteten Zustandsvektor
pro Tag (Position, Winkel, Geschwindigkeit).
Der gesamte I/O läuft in einem eigenen Thread. Der Event-Loop
holt Ergebnisse aus einer asyncio.Queue (maxsize=1), d.h. er
bekommt immer das neueste verfügbare Ergebnis.
"""
def __init__(self, cfg: dict):
cam = cfg["camera"]
trk = cfg["tracker"]
self.rtsp_url = cam["rtsp_url"]
self.width = cam["width"]
self.height = cam["height"]
self.rtsp_transport = cam.get("rtsp_transport", "udp")
self.reconnect_delay = cam.get("reconnect_delay", 3.0)
self.max_attempts = cam.get("reconnect_max_attempts", 0)
self.frame_bytes = self.width * self.height * 3
self.alpha_pos = trk["ema_alpha_pos"]
self.alpha_ang = trk["ema_alpha_ang"]
self.alpha_vel = trk["ema_alpha_vel"]
self.hold_seconds = trk["hold_seconds"]
self.pred_seconds = trk["predict_seconds"]
self.detector = Detector(
families = trk["tag_family"],
nthreads = trk["detector_threads"],
quad_decimate = trk["quad_decimate"],
quad_sigma = trk["quad_sigma"],
refine_edges = trk["refine_edges"],
decode_sharpening= trk["decode_sharpening"],
)
self.state: dict = {}
# Queue zwischen Producer-Thread und asyncio-Consumer
# maxsize=1: der Consumer bekommt immer das neueste Frame,
# ältere werden verworfen.
self._queue: Optional[asyncio.Queue] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._ffmpeg: Optional[subprocess.Popen] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self, loop: asyncio.AbstractEventLoop) -> None:
"""Startet den Producer-Thread. Muss mit dem laufenden Event-Loop aufgerufen werden."""
self._loop = loop
self._queue = asyncio.Queue(maxsize=1)
self._stop_event.clear()
self._thread = threading.Thread(target=self._producer_loop, daemon=True, name="tracker-producer")
self._thread.start()
logger.info("TagTracker gestartet")
def stop(self) -> None:
"""Stoppt den Producer-Thread und beendet ffmpeg sauber."""
self._stop_event.set()
self._kill_ffmpeg()
if self._thread:
self._thread.join(timeout=5.0)
logger.info("TagTracker gestoppt")
# ------------------------------------------------------------------
# Async Interface für den Event-Loop
# ------------------------------------------------------------------
async def get_tags(self) -> list:
"""
Wartet auf das nächste Ergebnis aus dem Producer-Thread.
Gibt eine leere Liste zurück wenn innerhalb von 0.5 s nichts kommt
(z.B. während eines Reconnects).
"""
if self._queue is None:
return []
try:
return await asyncio.wait_for(self._queue.get(), timeout=0.5)
except asyncio.TimeoutError:
return []
# ------------------------------------------------------------------
# Producer-Thread (läuft komplett außerhalb des Event-Loops)
# ------------------------------------------------------------------
def _producer_loop(self) -> None:
"""
Watchdog-Loop: startet ffmpeg, liest Frames, erkennt Tags,
stellt Ergebnis in die Queue. Startet ffmpeg bei Fehler neu.
"""
attempts = 0
while not self._stop_event.is_set():
attempts += 1
if self.max_attempts > 0 and attempts > self.max_attempts:
logger.error("Maximale Reconnect-Versuche erreicht. Producer stoppt.")
break
logger.info(f"ffmpeg starten (Versuch {attempts})… URL: {self.rtsp_url}")
proc = self._spawn_ffmpeg()
if proc is None:
self._stop_event.wait(self.reconnect_delay)
continue
self._ffmpeg = proc
read_errors = 0
try:
while not self._stop_event.is_set():
raw = proc.stdout.read(self.frame_bytes)
if len(raw) != self.frame_bytes:
read_errors += 1
if read_errors >= 3:
logger.warning("Zu viele Frame-Lesefehler — Reconnect")
break
continue
read_errors = 0
tags = self._process_frame(raw)
self._enqueue(tags)
except Exception as e:
logger.error(f"Fehler im Producer-Thread: {e}")
finally:
self._kill_ffmpeg()
if not self._stop_event.is_set():
logger.info(f"Reconnect in {self.reconnect_delay:.1f}s…")
self._stop_event.wait(self.reconnect_delay)
def _spawn_ffmpeg(self) -> Optional[subprocess.Popen]:
cmd = [
"ffmpeg",
"-rtsp_transport", self.rtsp_transport,
"-fflags", "nobuffer",
"-flags", "low_delay",
"-probesize", "32",
"-analyzeduration","0",
"-vsync", "0",
"-i", self.rtsp_url,
"-an", "-sn", "-dn",
"-pix_fmt", "bgr24",
"-f", "rawvideo",
"-",
]
try:
return subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=self.frame_bytes,
)
except FileNotFoundError:
logger.error("ffmpeg nicht gefunden — bitte installieren")
return None
except Exception as e:
logger.error(f"ffmpeg-Start fehlgeschlagen: {e}")
return None
def _kill_ffmpeg(self) -> None:
proc = self._ffmpeg
if proc and proc.poll() is None:
try:
proc.terminate()
proc.wait(timeout=3.0)
except Exception:
proc.kill()
self._ffmpeg = None
def _enqueue(self, tags: list) -> None:
"""Stellt Tags in die Queue. Älteres Ergebnis wird verworfen wenn voll."""
if self._queue is None or self._loop is None:
return
# Aus dem Thread heraus: Thread-safe über call_soon_threadsafe
self._loop.call_soon_threadsafe(self._queue_put_nowait, tags)
def _queue_put_nowait(self, tags: list) -> None:
"""Läuft im Event-Loop-Thread. Tauscht älteres Ergebnis aus."""
if self._queue is None:
return
if self._queue.full():
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
self._queue.put_nowait(tags)
except asyncio.QueueFull:
pass
# ------------------------------------------------------------------
# Frame-Verarbeitung (läuft im Producer-Thread)
# ------------------------------------------------------------------
def _process_frame(self, raw: bytes) -> list:
frame = np.frombuffer(raw, dtype=np.uint8).reshape(
(self.height, self.width, 3)
)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detections = self.detector.detect(gray)
now = time.time()
for d in detections:
tag_id = int(d.tag_id)
cx, cy = float(d.center[0]), float(d.center[1])
p0, p1 = d.corners[0], d.corners[1]
theta_meas = float(np.degrees(
np.arctan2(p1[0] - p0[0], -(p1[1] - p0[1]))
))
if tag_id not in self.state:
self.state[tag_id] = {
"x": cx, "y": cy, "theta": theta_meas,
"vx": 0.0, "vy": 0.0, "omega": 0.0,
"last_meas": now, "last_pose_ts": now,
"lx": cx, "ly": cy, "lt": theta_meas,
}
else:
s = self.state[tag_id]
x = ema(s["x"], cx, self.alpha_pos)
y = ema(s["y"], cy, self.alpha_pos)
th = ema_angle_deg(s["theta"], theta_meas, self.alpha_ang)
dt = now - s["last_pose_ts"]
if dt > 1e-3:
s["vx"] = ema(s["vx"], (x - s["lx"]) / dt, self.alpha_vel)
s["vy"] = ema(s["vy"], (y - s["ly"]) / dt, self.alpha_vel)
s["omega"] = ema(s["omega"], wrap_angle_deg(th - s["lt"]) / dt, self.alpha_vel)
s.update({
"x": x, "y": y, "theta": th,
"last_meas": now, "last_pose_ts": now,
"lx": x, "ly": y, "lt": th,
})
out = []
to_del = []
for tag_id, s in self.state.items():
age = now - s["last_meas"]
if age <= self.hold_seconds:
if age <= self.pred_seconds:
x = s["x"] + s["vx"] * age
y = s["y"] + s["vy"] * age
th = wrap_angle_deg(s["theta"] + s["omega"] * age)
else:
x, y, th = s["x"], s["y"], s["theta"]
out.append({
"id": tag_id,
"x": round(x, 2),
"y": round(y, 2),
"theta": round(th, 2),
"age": round(age, 3),
"vx": round(s["vx"], 2),
"vy": round(s["vy"], 2),
"omega": round(s["omega"], 2),
})
else:
to_del.append(tag_id)
for tag_id in to_del:
del self.state[tag_id]
return out