318 lines
11 KiB
Python
318 lines
11 KiB
Python
"""
|
|
tracker.py — Robot Tracker v005
|
|
Kamera-Capture und AprilTag-Erkennung laufen in einem dedizierten
|
|
Hintergrund-Thread. Der asyncio Event-Loop wird nie blockiert.
|
|
Ein Watchdog-Loop startet ffmpeg bei Verbindungsabbruch automatisch neu.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import math
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pupil_apriltags import Detector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hilfsfunktionen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def ema(a: float, b: float, alpha: float) -> float:
|
|
return a + alpha * (b - a)
|
|
|
|
|
|
def ema_angle_deg(prev_deg: float, new_deg: float, alpha: float) -> float:
|
|
"""EMA über den Einheitskreis — verhindert Sprung bei ±180°."""
|
|
px = math.cos(math.radians(prev_deg))
|
|
py = math.sin(math.radians(prev_deg))
|
|
nx = math.cos(math.radians(new_deg))
|
|
ny = math.sin(math.radians(new_deg))
|
|
return math.degrees(math.atan2(ema(py, ny, alpha), ema(px, nx, alpha)))
|
|
|
|
|
|
def wrap_angle_deg(a: float) -> float:
|
|
return (a + 180.0) % 360.0 - 180.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TagTracker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TagTracker:
|
|
"""
|
|
Liest Frames von einem RTSP-Stream via ffmpeg-Subprocess,
|
|
erkennt AprilTags und pflegt einen geglätteten Zustandsvektor
|
|
pro Tag (Position, Winkel, Geschwindigkeit).
|
|
|
|
Der gesamte I/O läuft in einem eigenen Thread. Der Event-Loop
|
|
holt Ergebnisse aus einer asyncio.Queue (maxsize=1), d.h. er
|
|
bekommt immer das neueste verfügbare Ergebnis.
|
|
"""
|
|
|
|
def __init__(self, cfg: dict):
|
|
cam = cfg["camera"]
|
|
trk = cfg["tracker"]
|
|
|
|
self.rtsp_url = cam["rtsp_url"]
|
|
self.width = cam["width"]
|
|
self.height = cam["height"]
|
|
self.rtsp_transport = cam.get("rtsp_transport", "udp")
|
|
self.reconnect_delay = cam.get("reconnect_delay", 3.0)
|
|
self.max_attempts = cam.get("reconnect_max_attempts", 0)
|
|
|
|
self.frame_bytes = self.width * self.height * 3
|
|
|
|
self.alpha_pos = trk["ema_alpha_pos"]
|
|
self.alpha_ang = trk["ema_alpha_ang"]
|
|
self.alpha_vel = trk["ema_alpha_vel"]
|
|
self.hold_seconds = trk["hold_seconds"]
|
|
self.pred_seconds = trk["predict_seconds"]
|
|
|
|
self.detector = Detector(
|
|
families = trk["tag_family"],
|
|
nthreads = trk["detector_threads"],
|
|
quad_decimate = trk["quad_decimate"],
|
|
quad_sigma = trk["quad_sigma"],
|
|
refine_edges = trk["refine_edges"],
|
|
decode_sharpening= trk["decode_sharpening"],
|
|
)
|
|
|
|
self.state: dict = {}
|
|
|
|
# Queue zwischen Producer-Thread und asyncio-Consumer
|
|
# maxsize=1: der Consumer bekommt immer das neueste Frame,
|
|
# ältere werden verworfen.
|
|
self._queue: Optional[asyncio.Queue] = None
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
self._ffmpeg: Optional[subprocess.Popen] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
def start(self, loop: asyncio.AbstractEventLoop) -> None:
|
|
"""Startet den Producer-Thread. Muss mit dem laufenden Event-Loop aufgerufen werden."""
|
|
self._loop = loop
|
|
self._queue = asyncio.Queue(maxsize=1)
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._producer_loop, daemon=True, name="tracker-producer")
|
|
self._thread.start()
|
|
logger.info("TagTracker gestartet")
|
|
|
|
def stop(self) -> None:
|
|
"""Stoppt den Producer-Thread und beendet ffmpeg sauber."""
|
|
self._stop_event.set()
|
|
self._kill_ffmpeg()
|
|
if self._thread:
|
|
self._thread.join(timeout=5.0)
|
|
logger.info("TagTracker gestoppt")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Async Interface für den Event-Loop
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_tags(self) -> list:
|
|
"""
|
|
Wartet auf das nächste Ergebnis aus dem Producer-Thread.
|
|
Gibt eine leere Liste zurück wenn innerhalb von 0.5 s nichts kommt
|
|
(z.B. während eines Reconnects).
|
|
"""
|
|
if self._queue is None:
|
|
return []
|
|
try:
|
|
return await asyncio.wait_for(self._queue.get(), timeout=0.5)
|
|
except asyncio.TimeoutError:
|
|
return []
|
|
|
|
# ------------------------------------------------------------------
|
|
# Producer-Thread (läuft komplett außerhalb des Event-Loops)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _producer_loop(self) -> None:
|
|
"""
|
|
Watchdog-Loop: startet ffmpeg, liest Frames, erkennt Tags,
|
|
stellt Ergebnis in die Queue. Startet ffmpeg bei Fehler neu.
|
|
"""
|
|
attempts = 0
|
|
while not self._stop_event.is_set():
|
|
attempts += 1
|
|
if self.max_attempts > 0 and attempts > self.max_attempts:
|
|
logger.error("Maximale Reconnect-Versuche erreicht. Producer stoppt.")
|
|
break
|
|
|
|
logger.info(f"ffmpeg starten (Versuch {attempts})… URL: {self.rtsp_url}")
|
|
proc = self._spawn_ffmpeg()
|
|
if proc is None:
|
|
self._stop_event.wait(self.reconnect_delay)
|
|
continue
|
|
|
|
self._ffmpeg = proc
|
|
read_errors = 0
|
|
|
|
try:
|
|
while not self._stop_event.is_set():
|
|
raw = proc.stdout.read(self.frame_bytes)
|
|
|
|
if len(raw) != self.frame_bytes:
|
|
read_errors += 1
|
|
if read_errors >= 3:
|
|
logger.warning("Zu viele Frame-Lesefehler — Reconnect")
|
|
break
|
|
continue
|
|
|
|
read_errors = 0
|
|
tags = self._process_frame(raw)
|
|
self._enqueue(tags)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler im Producer-Thread: {e}")
|
|
finally:
|
|
self._kill_ffmpeg()
|
|
|
|
if not self._stop_event.is_set():
|
|
logger.info(f"Reconnect in {self.reconnect_delay:.1f}s…")
|
|
self._stop_event.wait(self.reconnect_delay)
|
|
|
|
def _spawn_ffmpeg(self) -> Optional[subprocess.Popen]:
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-rtsp_transport", self.rtsp_transport,
|
|
"-fflags", "nobuffer",
|
|
"-flags", "low_delay",
|
|
"-probesize", "32",
|
|
"-analyzeduration","0",
|
|
"-vsync", "0",
|
|
"-i", self.rtsp_url,
|
|
"-an", "-sn", "-dn",
|
|
"-pix_fmt", "bgr24",
|
|
"-f", "rawvideo",
|
|
"-",
|
|
]
|
|
try:
|
|
return subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
bufsize=self.frame_bytes,
|
|
)
|
|
except FileNotFoundError:
|
|
logger.error("ffmpeg nicht gefunden — bitte installieren")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"ffmpeg-Start fehlgeschlagen: {e}")
|
|
return None
|
|
|
|
def _kill_ffmpeg(self) -> None:
|
|
proc = self._ffmpeg
|
|
if proc and proc.poll() is None:
|
|
try:
|
|
proc.terminate()
|
|
proc.wait(timeout=3.0)
|
|
except Exception:
|
|
proc.kill()
|
|
self._ffmpeg = None
|
|
|
|
def _enqueue(self, tags: list) -> None:
|
|
"""Stellt Tags in die Queue. Älteres Ergebnis wird verworfen wenn voll."""
|
|
if self._queue is None or self._loop is None:
|
|
return
|
|
# Aus dem Thread heraus: Thread-safe über call_soon_threadsafe
|
|
self._loop.call_soon_threadsafe(self._queue_put_nowait, tags)
|
|
|
|
def _queue_put_nowait(self, tags: list) -> None:
|
|
"""Läuft im Event-Loop-Thread. Tauscht älteres Ergebnis aus."""
|
|
if self._queue is None:
|
|
return
|
|
if self._queue.full():
|
|
try:
|
|
self._queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
pass
|
|
try:
|
|
self._queue.put_nowait(tags)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
# ------------------------------------------------------------------
|
|
# Frame-Verarbeitung (läuft im Producer-Thread)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _process_frame(self, raw: bytes) -> list:
|
|
frame = np.frombuffer(raw, dtype=np.uint8).reshape(
|
|
(self.height, self.width, 3)
|
|
)
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
detections = self.detector.detect(gray)
|
|
now = time.time()
|
|
|
|
for d in detections:
|
|
tag_id = int(d.tag_id)
|
|
cx, cy = float(d.center[0]), float(d.center[1])
|
|
p0, p1 = d.corners[0], d.corners[1]
|
|
theta_meas = float(np.degrees(
|
|
np.arctan2(p1[0] - p0[0], -(p1[1] - p0[1]))
|
|
))
|
|
|
|
if tag_id not in self.state:
|
|
self.state[tag_id] = {
|
|
"x": cx, "y": cy, "theta": theta_meas,
|
|
"vx": 0.0, "vy": 0.0, "omega": 0.0,
|
|
"last_meas": now, "last_pose_ts": now,
|
|
"lx": cx, "ly": cy, "lt": theta_meas,
|
|
}
|
|
else:
|
|
s = self.state[tag_id]
|
|
x = ema(s["x"], cx, self.alpha_pos)
|
|
y = ema(s["y"], cy, self.alpha_pos)
|
|
th = ema_angle_deg(s["theta"], theta_meas, self.alpha_ang)
|
|
|
|
dt = now - s["last_pose_ts"]
|
|
if dt > 1e-3:
|
|
s["vx"] = ema(s["vx"], (x - s["lx"]) / dt, self.alpha_vel)
|
|
s["vy"] = ema(s["vy"], (y - s["ly"]) / dt, self.alpha_vel)
|
|
s["omega"] = ema(s["omega"], wrap_angle_deg(th - s["lt"]) / dt, self.alpha_vel)
|
|
|
|
s.update({
|
|
"x": x, "y": y, "theta": th,
|
|
"last_meas": now, "last_pose_ts": now,
|
|
"lx": x, "ly": y, "lt": th,
|
|
})
|
|
|
|
out = []
|
|
to_del = []
|
|
for tag_id, s in self.state.items():
|
|
age = now - s["last_meas"]
|
|
if age <= self.hold_seconds:
|
|
if age <= self.pred_seconds:
|
|
x = s["x"] + s["vx"] * age
|
|
y = s["y"] + s["vy"] * age
|
|
th = wrap_angle_deg(s["theta"] + s["omega"] * age)
|
|
else:
|
|
x, y, th = s["x"], s["y"], s["theta"]
|
|
out.append({
|
|
"id": tag_id,
|
|
"x": round(x, 2),
|
|
"y": round(y, 2),
|
|
"theta": round(th, 2),
|
|
"age": round(age, 3),
|
|
"vx": round(s["vx"], 2),
|
|
"vy": round(s["vy"], 2),
|
|
"omega": round(s["omega"], 2),
|
|
})
|
|
else:
|
|
to_del.append(tag_id)
|
|
|
|
for tag_id in to_del:
|
|
del self.state[tag_id]
|
|
|
|
return out
|