Initial commit: HoloDeck v010 Server + v011 ESP32 Firmware

This commit is contained in:
Franke 2026-04-01 22:09:30 +02:00
commit cca1a5f653
14 changed files with 2637 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
__pycache__/
*.pyc
logs/
*.log
.env

643
README.md Normal file
View file

@ -0,0 +1,643 @@
# HoloDeck — AprilTag-Robotersteuerung für den Unterricht
**Pädagogische Hochschule Weingarten · FrankeLab**
Ein overhead-kamerabasiertes System zur autonomen Navigation physischer Differentialantriebs-Roboter in Klassenzimmern. Schülerinnen und Schüler programmieren die Roboter über eine grafische Blockly-Oberfläche.
---
## Inhaltsverzeichnis
1. [Systemübersicht](#1-systemübersicht)
2. [Hardware-Komponenten](#2-hardware-komponenten)
3. [Verdrahtung & Schaltplan](#3-verdrahtung--schaltplan)
4. [Netzwerk & Infrastruktur](#4-netzwerk--infrastruktur)
5. [AprilTag-Layout](#5-apriltag-layout)
6. [Software-Architektur](#6-software-architektur)
7. [Entwicklungsgeschichte (v005v011)](#7-entwicklungsgeschichte-v005v011)
8. [Deployment (Server)](#8-deployment-server)
9. [Firmware flashen (ESP32)](#9-firmware-flashen-esp32)
10. [Konfiguration (config.yaml)](#10-konfiguration-configyaml)
11. [Kommunikationsprotokoll](#11-kommunikationsprotokoll)
12. [Regler-Tuning](#12-regler-tuning)
13. [Bekannte Probleme & Workarounds](#13-bekannte-probleme--workarounds)
14. [Roadmap](#14-roadmap)
---
## 1. Systemübersicht
```
┌─────────────────────────────────────────────────────────────┐
│ Overhead-Kamera (HuskyLens 2) │
│ RTSP → 192.168.0.30:8554/live │
└────────────────────┬────────────────────────────────────────┘
│ RTSP (UDP, 1280×720)
┌─────────────────────────────────────────────────────────────┐
│ Ubuntu-Server 192.168.0.250 │
│ /opt/robot-tracker-v010/ │
│ ┌──────────┐ ┌───────────┐ ┌──────────────┐ │
│ │ tracker │ │ collision │ │ navlogger │ │
│ │ AprilTag │→ │ Potential │→ │ CSV-Logging │ │
│ │ Detektor │ │ -feld PID │ │ │ │
│ └──────────┘ └───────────┘ └──────────────┘ │
│ FastAPI / uvicorn Port 80 │
│ TCP-Server Port 8765 │
└────────────────┬────────────────────────────────────────────┘
│ TCP (JSON, newline-delimited)
┌─────────────────────────────────────────────────────────────┐
│ Roboter (Freenove ESP32-S3 + MicroPython v1.23) │
│ WiFi FrankeLab → 192.168.0.250:8765 │
│ ┌──────────┐ ┌────────────┐ ┌────────────┐ │
│ │ PID-Loop │ │ Encoder │ │ Motoren │ │
│ │ 20 Hz │← │ IRQ 50ms │ │ L9110S │ │
│ └──────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
---
## 2. Hardware-Komponenten
### Server / Kamera
| Komponente | Details |
|---|---|
| Server | Ubuntu 24.04, statisch 192.168.0.250 |
| Kamera | HuskyLens 2 (RTSP-Modus) |
| Kamera-Mount | Overhead-Befestigung zentral über der Spielfläche |
### Roboter (ein Exemplar, erweiterbar)
| Komponente | Bezeichnung / Bestellnummer |
|---|---|
| Chassis | DollaTek 2WD Roboter-Chassis-Kit |
| Mikrocontroller | Freenove ESP32-S3 WROOM Board Lite |
| Breakout-Board | Freenove ESP32-S3 Breakout Board (Schraubklemmen) |
| Motortreiber | L9110S-Modul (Schraubklemmen, 2-Kanal) |
| Encoder-Sensoren | Optokoppler-Geschwindigkeitssensor (74HC14D-basiert, 3,35V) |
| Powerbank | Intenso XS5000 (USB-A + USB-C Ausgang) |
| AprilTag (Roboter) | Tag36h11, ID 91, auf Aufkleber am Roboter |
> **Hinweis Motortereiber-Geschichte:** Zunächst wurde ein handgelöteter TB6612FNG auf Lochraster verwendet. Dieser wurde durch das deutlich handhabbarere L9110S-Schraubklemmen-Modul ersetzt.
> **WARNUNG:** Ein früheres AZDelivery-ESP32-Board wurde wahrscheinlich durch gleichzeitigen USB+Akku-Betrieb beschädigt. **Niemals** gleichzeitig USB und Powerbank anlegen wenn GPIO-Pins des Motortreibers auf HIGH liegen!
---
## 3. Verdrahtung & Schaltplan
### Stromversorgung — ZWEI GETRENNTE RAILS (kritisch!)
Das System verwendet **zwei physisch getrennte Stromversorgungskreise**, weil WiFi-Peak (200400 mA) + zwei Motoren den 5V-Rail eines einzigen USB-Anschlusses überlasten:
```
┌─────────────────────┐ ┌──────────────────────────────────┐
│ Intenso XS5000 │ │ Intenso XS5000 │
│ USB-C Ausgang │ │ USB-A Ausgang │
│ 5V → ESP32 (VIN) │ │ Kabel aufschneiden: │
│ │ │ Roter Draht → VM (Motortreiber) │
└─────────────────────┘ └──────────────────────────────────┘
│ │
│ USB-C Kabel │ Modifiziertes USB-A-Kabel
▼ ▼
ESP32-S3 (USB-C) VM-Pin L9110S
Logikversorgung Motorversorgung
GND verbunden: ESP32 GND ←——→ L9110S GND (gemeinsame Masse!)
```
**Wichtig:** Das schwarze (GND) Kabel des USB-A-Kabels wird ebenfalls mit dem ESP32-GND verbunden. Die Datenleitungen (grün/weiß) bleiben unbenutzt / abgeklebt.
---
### GPIO-Belegung ESP32-S3
| GPIO | Funktion | Verbindung |
|---|---|---|
| GPIO 17 | AIN1 (Motor A Richtung 1) | L9110S Pin IA1 |
| GPIO 18 | AIN2 (Motor A Richtung 2) | L9110S Pin IA2 |
| GPIO 8 | PWMA (Motor A Geschwindigkeit) | L9110S Pin — via Jumper oder direkt |
| GPIO 9 | BIN1 (Motor B Richtung 1) | L9110S Pin IB1 |
| GPIO 10 | BIN2 (Motor B Richtung 2) | L9110S Pin IB2 |
| GPIO 11 | PWMB (Motor B Geschwindigkeit) | L9110S Pin — |
| GPIO 6 | Encoder Links (Interrupt) | Optokoppler OUT |
| GPIO 7 | Encoder Rechts (Interrupt) | Optokoppler OUT |
| 3V3 | Encoder VCC | Optokoppler VCC |
| GND | Gemeinsame Masse | L9110S GND, Encoder GND |
> **Achtung GPIO 12:** GPIO 12 ist am ESP32(-S3 nicht betroffen, aber klassischer ESP32) ein Strapping-Pin und verursacht Boot-Probleme. Wird hier nicht verwendet.
---
### L9110S Motor-Treiber Anschlussschema
```
ESP32-S3 L9110S Modul
───────── ────────────
GPIO 17 ──────────────────→ IA1 OA1 ──→ Motor A (links) Klemme 1
GPIO 18 ──────────────────→ IA2 OA2 ──→ Motor A (links) Klemme 2
GPIO 8 ──────── (PWM) ──→ IA1 oder Jumper (je nach Modul)
GPIO 9 ──────────────────→ IB1 OB1 ──→ Motor B (rechts) Klemme 1
GPIO 10 ──────────────────→ IB2 OB2 ──→ Motor B (rechts) Klemme 2
GPIO 11 ──────── (PWM) ──→ IB1 oder Jumper
3V3 ──────────────────→ VCC (Logik)
GND ──────────────────→ GND
USB-A rot ────────────────→ VM (Motorversorgung 5V)
```
> **Hinweis:** Das L9110S-Modul vereint Richtungssteuerung und PWM auf zwei Eingangspins pro Kanal (IA1/IA2). Die Drehrichtung ergibt sich aus dem Muster HIGH/LOW bzw. PWM/GND. Konkrete Zuordnung in `main.py → set_motor()`.
---
### Encoder-Sensor Anschluss
```
Optokoppler-Modul (74HC14D) ESP32-S3
─────────────────────────── ─────────
VCC ──────────────────────→ 3V3
GND ──────────────────────→ GND
OUT (linker Sensor) ──────→ GPIO 6
OUT (rechter Sensor) ──────→ GPIO 7
Lochscheibe: 20 Löcher/Umdrehung → 20 Pulse/Rev
```
---
### Befestigung
- Alle Elektronikmodule mit **Heißkleber** auf dem Chassis fixiert (sicher, rückbaubar)
- Encoder-Sensoren justiert auf Lochscheibe der Motorachsen
- AprilTag (ID 91) flach auf der Oberseite des Roboters aufgeklebt, muss von oben sichtbar sein
---
## 4. Netzwerk & Infrastruktur
| Komponente | IP-Adresse | Port |
|---|---|---|
| Ubuntu-Server | 192.168.0.250 (statisch) | — |
| FastAPI/Webinterface | 192.168.0.250 | 80 |
| TCP Robot-Server | 192.168.0.250 | 8765 |
| HuskyLens 2 (RTSP) | 192.168.0.30 | 8554 |
| Roboter (ESP32) | DHCP | — |
**WLAN:** `FrankeLab` (Passwort in `main.py` und `WIFI_PSW` Variable)
**Python-Umgebung Server:**
```bash
/opt/vision-venv/ # Virtuelle Umgebung
/opt/robot-tracker-v010/ # Aktueller Produktionsstand
```
---
## 5. AprilTag-Layout
Alle Tags aus der Familie **tag36h11**.
### Kalibrierungs-Tags (Referenzpunkte)
Auf einem A4-Blatt, fixiert im Sichtfeld der Kamera:
| Tag-ID | Position (mm) |
|---|---|
| 0 | (0.0, 0.0) — Ursprung |
| 1 | (100.5, 0.0) |
| 2 | (0.0, 181.0) |
| 3 | (100.5, 181.0) |
### Roboter-Tags
| Tag-ID | Bedeutung |
|---|---|
| 91 | Roboter 1 |
### Hindernis-Tags
| Tag-IDs |
|---|
| 11, 12, 21, 22, 31, 32, 41, 42, 51, 52, 61, 62, 71, 72, 81, 82, 92 |
Hindernisse werden als AprilTags auf dem Boden oder an Objekten platziert. Der Server berechnet automatisch Abstoßungskräfte (Potentialfeld).
---
## 6. Software-Architektur
### Server-Stack
```
app.py FastAPI-App, 20-Hz-Steuerungsloop, TCP-Server, WebSocket-Broadcast
tracker.py AprilTag-Detektion via pupil-apriltags, EMA-Filter, Dead Reckoning
collision.py Potentialfeld-Navigation: Zielanziehung + Hindernisabstoßung
navlogger.py CSV-Logging aller Navigationsdaten → logs/nav_YYYYMMDD_HHMMSS.csv
calibration.py Homographie-Kalibrierung (Kamerabild → Weltkoordinaten mm)
config.yaml Alle Parameter externalisiert
static/ Browser-Frontend (Live-Kamerabild + AprilTag-Overlay)
```
### ESP32-Firmware
```
main.py Komplette Roboter-Firmware (MicroPython)
├── WiFi-Verbindung mit Auto-Reconnect
├── TCP-Client → Server 192.168.0.250:8765
├── Encoder-Interrupts (GPIO 6, 7)
├── PID-Loop 20 Hz (asyncio)
├── Telemetrie-Loop 10 Hz (asyncio)
└── Feedforward-Architektur (v011, aktuell deaktiviert = Option B)
```
---
## 7. Entwicklungsgeschichte v005v011
### v005 — Stabile Tracking-Basis (Server only)
- **Kamera-I/O** auf dedizierten Background-Thread via `asyncio.Queue(maxsize=1)`
- **ffmpeg-Watchdog** mit Auto-Reconnect bei Kameraausfall
- **config.yaml** — alle Parameter externalisiert, kein Hardcoding mehr
- **Velocity-Vektor-Visualisierung** im Browser-Frontend
- Bestätigt: Merklich flüssiger als v004
### v006 — Grundlegende Navigation
- Erste TCP-Kommunikation mit ESP32
- Einfache Richtungs-Befehle (links/rechts/vorwärts)
- Kein Encoder-Feedback
### v007 — Homographie-Kalibrierung
- `calibration.py` eingeführt: 4 Referenz-Tags → Kamerabild-zu-Weltkoordinaten-Mapping
- Kamerakoordinaten erstmals in echte mm umgerechnet
- A4-Kalibrierblatt mit Tags 03 als Referenz
### v008 — Erste Hindernisvermeidung
- `collision.py` eingeführt: Potentialfeld-Ansatz
- Zielanziehung + Hindernisabstoßung
- Noch instabil: Spin-Probleme, zu aggressiv
### v008_log — Navigation-Logging
- `navlogger.py` hinzugefügt
- CSV-Logging aller Navigationsdaten für spätere Analyse
- Log-Dateien in `logs/nav_DATUM_UHRZEIT.csv`
### v009 — Encoder-PID + Erste stabile Navigation
**Meilenstein: Erster Roboter fährt autonom zum Ziel**
- **Encoder-Interrupts** auf GPIO 6/7
- **PID-Regler** für Radgeschwindigkeit: `KP=1.2, KI=0.3, KD=0.05`
- **Kommunikationsprotokoll** auf mm/s umgestellt (server→robot: `{"speed_l": N, "speed_r": N}`)
- **TCP statt WebSocket** für Robot-Kommunikation (simpler, stabiler)
- **Dead Reckoning**: Bei Tag-Verlust schätzt ESP32 Position aus Encoder-Daten
- Hardware-Parameter in config.yaml: `mm_per_pulse=10.525`, `axle_width_mm=121.0`
- **PWM-Minimum 150** damit Motoren sicher anlaufen
### v009_fix → v009_spin_fix → v009_goal_fix
Iterative Fixes basierend auf Log-Analyse:
- Spin-Schwelle angepasst
- Ziel-Erreicht-Erkennung verbessert
- Repulsionskraft von 5000 → 1500 (war zu aggressiv)
### v010 — Stabilisierte Navigation (aktueller Produktionsstand)
**Serverseitige Verbesserungen:**
| Parameter | Alt | Neu | Grund |
|---|---|---|---|
| `FORCE_ALPHA` | 0.15 | 0.35 | Zu träge → Endlos-Spin |
| `SPIN_THRESHOLD` | 120° | 90° | 120° zu hoch, Roboter überschoss |
| `SPIN_STOP` | — | 35° | Sauberes Spin-Ende |
| `SPIN_SLOWDOWN` | — | 60° | Sanftes Abbremsen |
| `HEADING_DEADBAND` | — | 6° | Unter 6° kein Kurskorrektur |
| `repulsion_strength` | 5000 | 1500 | Sanftere Abstoßung |
| Controller-Frequenz | 10 Hz | 20 Hz | Doppelte Reaktionsgeschwindigkeit |
| `reset_robot()` | — | ✓ | Roboter stoppt bei Verbindungsabbruch |
**Bug behoben:** `FORCE_ALPHA=0.15` war zu träge — der geglättete Kraftvektor folgte der echten Richtung so langsam, dass der Roboter beim Drehen immer wieder die Stopp-Schwelle überschoss und neu anfing zu drehen → Endlos-Spin.
### v011 — Feedforward-Architektur (aktuell auf ESP32)
**Firmware-Verbesserungen:**
Neue Feedforward-Architektur: `u = kS·sign(v) + kV·|v| + PID(error)`
- **kS (Haftreibungskompensation):** Fester PWM-Wert um Haftreibung zu überwinden
- **kV (Kennlinien-Linearisierung):** PWM proportional zur Sollgeschwindigkeit
- **PID regelt nur noch den kleinen Restfehler** statt alles alleine
- **Gleitendes Messfenster** `SPEED_WINDOW=3` (150ms) für Istgeschwindigkeit — bei 20 Pulsen/Rev ist 50ms zu grob (quantisiertes Rauschen)
- **Anti-Windup:** Integral einfrieren bei PWM-Sättigung
- **Option B (aktuell aktiv):** `KS=0, KV=0` → reiner PID wie v009, aber mit Messfenster und Anti-Windup
**Bugs gefunden und behoben in v011:**
*Bug 1: Motoren drehten nicht*
- Ursache: In v011 wird `current_pwm = ff + pid_out · sign` **direkt gesetzt** statt aufaddiert
- Mit `KS=KV=0` gibt feedforward() 0 zurück → PWM bleibt auf der ersten Stufe, zu klein für Motoranlauf
- Fix: Wenn `KS=KV=0` → aufaddieren wie v009 (`current_pwm += pid_out · sign`) + PWM_MIN erzwingen
```python
if KS_L == 0.0 and KV_L == 0.0:
current_pwm_l += pid_out_l * sign_l # aufaddieren
if sign_l > 0 and current_pwm_l < PWM_MIN:
current_pwm_l = float(PWM_MIN) # Mindest-PWM
else:
current_pwm_l = ff_l + pid_out_l * sign_l # Feedforward-Modus
```
---
## 8. Deployment (Server)
### Voraussetzungen
```bash
# Python-Umgebung
python3 -m venv /opt/vision-venv
source /opt/vision-venv/bin/activate
pip install fastapi uvicorn pupil-apriltags opencv-python-headless numpy pyyaml
```
### Aktuellen Stand deployen
```bash
# Dateien kopieren
scp -r v010/* root@192.168.0.250:/opt/robot-tracker-v010/
# Server starten
ssh root@192.168.0.250
cd /opt/robot-tracker-v010
/opt/vision-venv/bin/python -m uvicorn app:app --host 0.0.0.0 --port 80
```
### Schnell neu starten
```bash
ssh root@192.168.0.250 "pkill -f uvicorn; sleep 1; \
cd /opt/robot-tracker-v010 && \
/opt/vision-venv/bin/python -m uvicorn app:app --host 0.0.0.0 --port 80"
```
### Nur collision.py neu laden (kein Neustart nötig bei Code-Änderungen)
```bash
scp collision.py root@192.168.0.250:/opt/robot-tracker-v010/
# dann Server neu starten
```
### Logs einsehen
```bash
ssh root@192.168.0.250
ls -lt /opt/robot-tracker-v010/logs/
cat /opt/robot-tracker-v010/logs/nav_20260401_192534.csv | head -5
```
### Log-Format
```
timestamp, elapsed_s, robot_x_mm, robot_y_mm, robot_theta,
goal_x_mm, goal_y_mm, dist_to_goal_mm, angle_error_deg,
cmd_speed_l, cmd_speed_r, cmd_status,
num_obstacles, nearest_obstacle_id, nearest_obstacle_dist_mm,
all_obstacle_dists
```
`cmd_status` kann sein: `straight`, `curving`, `spinning`
---
## 9. Firmware flashen (ESP32)
### Voraussetzungen
```bash
# esptool aus Freenove-Toolchain
~/Downloads/Freenove_ESP32_S3_WROOM_Board_Lite-main/Python/Python_Firmware/
# mpremote für REPL-Zugriff
pip install mpremote
```
### Gerät erkennen
Der ESP32-S3 erscheint unter Ubuntu als `/dev/ttyACM0` (native USB, **nicht** `/dev/ttyUSB*`).
```bash
ls /dev/ttyACM*
```
### MicroPython flashen (nur wenn nötig)
```bash
# Flash löschen
python esptool.py --chip esp32s3 --port /dev/ttyACM0 erase_flash
# MicroPython flashen (v1.23.0)
python esptool.py --chip esp32s3 --port /dev/ttyACM0 write_flash \
-z 0x0 ESP32_GENERIC_S3-SPIRAM_OCT-20240602-v1.23.0.bin
```
### Firmware-Datei übertragen
```bash
# main.py auf ESP32 kopieren
mpremote connect /dev/ttyACM0 fs cp main.py :main.py
# Neustart
mpremote connect /dev/ttyACM0 reset
```
### REPL für Debugging
```bash
mpremote connect /dev/ttyACM0
# Dann Ctrl+C → interaktive Python-Konsole
```
---
## 10. Konfiguration (config.yaml)
```yaml
camera:
rtsp_url: "rtsp://192.168.0.30:8554/live"
width: 1280
height: 720
rtsp_transport: "udp"
reconnect_delay: 3.0
server:
host: "0.0.0.0"
port: 80
broadcast_hz: 30
tracker:
tag_family: "tag36h11"
detector_threads: 2
quad_decimate: 1.0
ema_alpha_pos: 0.35 # Positions-Glättung (0=träge, 1=roh)
ema_alpha_ang: 0.35 # Winkel-Glättung
ema_alpha_vel: 0.25 # Geschwindigkeits-Glättung
hold_seconds: 0.80 # Tag-Verlust Toleranz
predict_seconds: 0.25 # Prediction-Horizont
robots:
tcp_port: 8765
robot_tag_ids: [91]
goal_radius: 80 # mm — Ziel gilt als erreicht
speed_base: 160 # mm/s — Fahrgeschwindigkeit
speed_turn: 90 # mm/s — Spin-Geschwindigkeit
speed_min: 50
speed_max: 350
angle_threshold: 120 # Grad — nur noch für Spin-Trigger
calibration:
reference_tags:
0: [0.0, 0.0 ]
1: [100.5, 0.0 ]
2: [0.0, 181.0]
3: [100.5, 181.0]
obstacles:
safe_distance: 150 # mm — Abstoßungsbereich
repulsion_strength: 1500 # Stärke der Abstoßung
obstacle_tag_ids: [11,12,21,22,31,32,41,42,51,52,61,62,71,72,81,82,92]
robot_hardware:
wheel_diameter_mm: 67.0
wheel_circumference_mm: 210.5
encoder_pulses_per_rev: 20
mm_per_pulse: 10.525 # 210.5 / 20
axle_width_mm: 121.0 # Radmitte links → Radmitte rechts
pulses_per_360deg: 36.1 # π × axle_width / mm_per_pulse
```
---
## 11. Kommunikationsprotokoll
### Server → Roboter (TCP, newline-delimited JSON)
```json
{"speed_l": 160.0, "speed_r": 160.0}
```
Beide Werte in **mm/s**. Negativ = rückwärts. `0.0` = Stop.
### Roboter → Server (alle 100 ms)
```json
{
"robot_id": 91,
"enc_l": 1234,
"enc_r": 1235,
"speed_l": 158.3,
"speed_r": 161.1,
"pwm_l": 420,
"pwm_r": 435
}
```
| Feld | Bedeutung |
|---|---|
| `enc_l/r` | Absoluter Encoder-Zähler (monoton steigend) |
| `speed_l/r` | Istgeschwindigkeit in mm/s (aus gleitendem Fenster) |
| `pwm_l/r` | Aktueller PWM-Wert (0950) |
---
## 12. Regler-Tuning
### Serverseitig (collision.py)
| Parameter | Aktuell | Wirkung |
|---|---|---|
| `SPIN_THRESHOLD` | 90° | Ab diesem Winkel dreht Roboter auf der Stelle |
| `SPIN_STOP` | 35° | Unterhalb: Spin beendet, Kurvenfahrt beginnt |
| `SPIN_SLOWDOWN` | 60° | Spin wird sanfter |
| `FORCE_ALPHA` | 0.35 | EMA-Glättung des Kraftvektors (0=träge, 1=roh) |
| `HEADING_DEADBAND` | 6° | Winkeltoleranz für Geradeausfahrt |
| `repulsion_strength` | 1500 | Abstoßungsstärke von Hindernissen |
| `goal_radius` | 80 mm | Ziel gilt als erreicht |
### ESP32-seitig (main.py) — Option A: Feedforward aktivieren
```
Schritt 1 — kS bestimmen (Haftreibung):
→ speed_base sehr klein setzen (z.B. 20 mm/s)
→ KS_L/R erhöhen bis Räder gerade anfangen zu drehen
→ Typisch: 130170
Schritt 2 — kV bestimmen (Kennlinie):
→ speed_base = 160 mm/s, KS bereits gesetzt
→ Log beobachten: pwm_l/pwm_r bei Geradeausfahrt ablesen
→ kV = (PWM_gemessen - kS) / 160
→ Typisch: 2.03.5
Schritt 3 — PID reduzieren:
→ KP von 1.2 auf ~0.4 reduzieren
→ KI von 0.3 auf ~0.08 reduzieren
→ KD bleibt ~0.02
Schritt 4 — Links/Rechts-Asymmetrie:
→ kS_L ≠ kS_R fein abgleichen bis Geradeausfahrt ohne Drift
```
### ESP32-seitig — Option B (aktuell): Reiner PID
```python
KS_L = KS_R = 0.0 # Feedforward aus
KV_L = KV_R = 0.0
KP = 1.2
KI = 0.3
KD = 0.05
PWM_MIN = 150 # Erzwungenes Mindest-PWM
```
---
## 13. Bekannte Probleme & Workarounds
| Problem | Ursache | Workaround / Fix |
|---|---|---|
| Endlos-Spin | `FORCE_ALPHA` zu klein, Kraftvektor folgt zu langsam | `FORCE_ALPHA=0.35`, `SPIN_THRESHOLD=90°` |
| Motoren laufen nicht an | PWM_MIN fehlt im direkten Feedforward-Pfad | `PWM_MIN=150` erzwingen wenn `KS=0` |
| ESP32 bootet nicht | GPIO 12 belegt (Strapping-Pin) | GPIO 12 nicht verwenden |
| ESP32 beschädigt | Gleichzeitig USB + Powerbank + Motoren | Stets separate Stromkreise! |
| Kamera-Freeze | ffmpeg verliert RTSP-Stream | Watchdog mit Auto-Reconnect in v005+ |
| Tag nicht erkannt | Schlechte Lichtverhältnisse | Gleichmäßige Beleuchtung, möglichst kein direktes Sonnenlicht |
| Ziel nie erreicht | `goal_radius` zu klein, Kamera-Jitter | `goal_radius` auf 100120 mm erhöhen |
| Ziel wird weggedrückt | Hindernis-Repulsion nahe am Ziel | `repulsion_strength` bei kleiner `dist_to_goal` abschwächen (TODO) |
---
## 14. Roadmap
### Kurzfristig (Tuning)
- [ ] Feedforward-Koeffizienten `kS`, `kV` experimentell bestimmen und aktivieren
- [ ] `goal_radius` auf 100 mm erhöhen
- [ ] Repulsion nahe am Ziel abschwächen
- [ ] `axle_width_mm` und `pulses_per_360deg` exakt nachmessen
### Mittelfristig (Architektur)
- [ ] v/ω-Regler statt direktem Innen-/Außenrad-Schema (`collision.py`)
- [ ] Weiches Dead-Reckoning/Vision-Fusion (kein harter Sprung bei Tag-Wiedersehen)
- [ ] Mehrere Roboter gleichzeitig (IDs 91, 92, ...)
- [ ] Seeed Grove I2C TB6612 für einfachere Multi-Roboter-Verdrahtung
### Langfristig (Unterrichts-Interface)
- [ ] **Blockly-Interface** für Schülerinnen und Schüler
- [ ] Ziel per Klick im Browser setzen (bereits teilweise vorhanden)
- [ ] Mehrere Ziele als Route definieren
- [ ] Begegnungs-/Kooperationsszenarien
---
## Lizenz
Entwickelt für Unterrichtszwecke an der Pädagogischen Hochschule Weingarten.
Internes Projekt — nicht zur öffentlichen Weiterverbreitung bestimmt.
---
*Zuletzt aktualisiert: April 2026 — v011 Firmware, v010 Server*

399
app.py Normal file
View file

@ -0,0 +1,399 @@
"""
app.py Robot Tracker v009
- Encoder-PID: Server schickt mm/s statt PWM
- Dead Reckoning: Position schätzen wenn Tag unsichtbar
- Sanfteres Drehen mit Encoder-Feedback
- NavLogger mit Geschwindigkeitsdaten
"""
import asyncio
import json
import logging
import math
import time
from pathlib import Path
import yaml
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from tracker import TagTracker
from calibration import Calibration
from collision import compute_avoidance, force_to_speeds, reset_robot
from navlogger import NavLogger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("app")
_cfg_path = Path(__file__).parent / "config.yaml"
with open(_cfg_path) as f:
cfg = yaml.safe_load(f)
server_cfg = cfg["server"]
robot_cfg = cfg["robots"]
cal_cfg = cfg["calibration"]
obs_cfg = cfg["obstacles"]
hw_cfg = cfg["robot_hardware"]
broadcast_interval = 1.0 / server_cfg.get("broadcast_hz", 30)
ROBOT_TAG_IDS = robot_cfg["robot_tag_ids"]
GOAL_RADIUS = robot_cfg["goal_radius"]
SPEED_BASE = robot_cfg["speed_base"]
SPEED_TURN = robot_cfg["speed_turn"]
SPEED_MIN = robot_cfg["speed_min"]
SPEED_MAX = robot_cfg["speed_max"]
ANGLE_THRESHOLD = robot_cfg["angle_threshold"]
TCP_PORT = robot_cfg["tcp_port"]
DR_TIMEOUT = robot_cfg["dead_reckoning_timeout"]
SAFE_DISTANCE = obs_cfg["safe_distance"]
REPULSION = obs_cfg["repulsion_strength"]
OBSTACLE_IDS = set(obs_cfg["obstacle_tag_ids"])
REFERENCE_IDS = set(int(k) for k in cal_cfg["reference_tags"].keys())
MM_PER_PULSE = hw_cfg["mm_per_pulse"]
AXLE_WIDTH = hw_cfg["axle_width_mm"]
STUCK_TIMEOUT = 1.5
STUCK_THRESHOLD = 15
robots: dict = {} # robot_id → {"writer": ..., "enc_l": 0, "enc_r": 0, "speed_l": 0, "speed_r": 0}
goal: dict = {}
calibration = Calibration(cal_cfg["reference_tags"])
nav_logger = NavLogger("logs")
# Dead Reckoning Zustand
robot_dr: dict = {} # robot_id → {"x": mm, "y": mm, "theta": deg, "enc_l": int, "enc_r": int, "time": float}
robot_last_pos: dict = {}
app = FastAPI(title="Robot Tracker v009")
app.mount("/static", StaticFiles(directory="static"), name="static")
tracker = TagTracker(cfg)
browser_clients: set[WebSocket] = set()
# ---------------------------------------------------------------------------
# TCP Robot-Server
# ---------------------------------------------------------------------------
async def handle_robot(reader, writer) -> None:
robot_id = None
addr = writer.get_extra_info("peername")
logger.info(f"TCP Verbindung von {addr}")
try:
while True:
line = await reader.readline()
if not line:
break
data = json.loads(line.decode().strip())
if robot_id is None:
robot_id = data.get("robot_id")
robots[robot_id] = {
"writer": writer,
"enc_l": 0, "enc_r": 0,
"speed_l": 0.0, "speed_r": 0.0,
"pwm_l": 0, "pwm_r": 0,
"last_enc_l": 0, "last_enc_r": 0,
"last_telem_time": time.time(),
}
logger.info(f"Roboter {robot_id} verbunden (v={data.get('version','?')})")
if "enc_l" in data and robot_id:
r = robots[robot_id]
r["enc_l"] = data["enc_l"]
r["enc_r"] = data["enc_r"]
r["speed_l"] = data.get("speed_l", 0.0)
r["speed_r"] = data.get("speed_r", 0.0)
r["pwm_l"] = data.get("pwm_l", 0)
r["pwm_r"] = data.get("pwm_r", 0)
r["last_telem_time"] = time.time()
# Dead Reckoning aktualisieren
_update_dead_reckoning(robot_id, data)
except Exception as e:
logger.warning(f"Roboter {robot_id} Fehler: {e}")
finally:
if robot_id and robot_id in robots:
del robots[robot_id]
reset_robot(robot_id) # Kraft-Glättungs-Zustand zurücksetzen
writer.close()
logger.info(f"Roboter {robot_id} getrennt")
def _update_dead_reckoning(robot_id, telem: dict):
"""Aktualisiert Dead-Reckoning Position aus Encoder-Telemetrie."""
if robot_id not in robot_dr:
return
dr = robot_dr[robot_id]
new_enc_l = telem["enc_l"]
new_enc_r = telem["enc_r"]
delta_l = (new_enc_l - dr["enc_l"]) * MM_PER_PULSE
delta_r = (new_enc_r - dr["enc_r"]) * MM_PER_PULSE
dr["enc_l"] = new_enc_l
dr["enc_r"] = new_enc_r
# Richtung aus letztem Befehl
if robots.get(robot_id, {}).get("speed_l", 0) < 0:
delta_l = -delta_l
if robots.get(robot_id, {}).get("speed_r", 0) < 0:
delta_r = -delta_r
dist = (delta_l + delta_r) / 2.0
d_theta = math.degrees((delta_r - delta_l) / AXLE_WIDTH)
theta_rad = math.radians(dr["theta"] + d_theta / 2)
dr["x"] += dist * math.sin(theta_rad)
dr["y"] += dist * -math.cos(theta_rad)
dr["theta"] = (dr["theta"] + d_theta + 180) % 360 - 180
dr["time"] = time.time()
async def send_to_robot(robot_id: int, cmd: dict) -> None:
r = robots.get(robot_id)
if r:
try:
msg = json.dumps({
"speed_l": round(cmd["speed_l"], 1),
"speed_r": round(cmd["speed_r"], 1),
}) + "\n"
r["writer"].write(msg.encode())
await r["writer"].drain()
except Exception as e:
logger.warning(f"Senden an Roboter {robot_id} fehlgeschlagen: {e}")
# ---------------------------------------------------------------------------
# Regler-Loop
# ---------------------------------------------------------------------------
def check_stuck(robot_id, rx, ry) -> bool:
now = time.time()
if robot_id not in robot_last_pos:
robot_last_pos[robot_id] = (rx, ry, now)
return False
lx, ly, lt = robot_last_pos[robot_id]
dist_moved = math.sqrt((rx - lx)**2 + (ry - ly)**2)
if dist_moved > STUCK_THRESHOLD:
robot_last_pos[robot_id] = (rx, ry, now)
return False
if now - lt > STUCK_TIMEOUT:
logger.warning(f"Roboter {robot_id} steckt fest!")
robot_last_pos[robot_id] = (rx, ry, now)
return True
return False
async def controller_loop() -> None:
while True:
await asyncio.sleep(0.05) # 20 Hz statt 10 Hz — schnellere Reaktion
if not goal:
continue
for robot_tag_id in ROBOT_TAG_IDS:
if robot_tag_id not in robots:
continue
# Position bestimmen — Kamera oder Dead Reckoning
tag_state = tracker.state.get(robot_tag_id)
use_dr = False
if tag_state:
age = time.time() - tag_state.get("last_meas", 0)
if age > DR_TIMEOUT and robot_tag_id in robot_dr:
use_dr = True
dr = robot_dr[robot_tag_id]
rx, ry, rtheta = dr["x"], dr["y"], dr["theta"]
logger.debug(f"Dead Reckoning für Roboter {robot_tag_id}")
else:
if calibration.calibrated:
rx, ry = calibration.pixel_to_world(tag_state["x"], tag_state["y"])
else:
rx, ry = tag_state["x"], tag_state["y"]
rtheta = tag_state["theta"]
# Dead Reckoning mit Kamera-Position initialisieren
robot_dr[robot_tag_id] = {
"x": rx, "y": ry, "theta": rtheta,
"enc_l": robots[robot_tag_id]["enc_l"],
"enc_r": robots[robot_tag_id]["enc_r"],
"time": time.time(),
}
else:
if robot_tag_id in robot_dr:
use_dr = True
dr = robot_dr[robot_tag_id]
rx, ry, rtheta = dr["x"], dr["y"], dr["theta"]
else:
continue
# Zieldistanz
dx = goal["x"] - rx
dy = goal["y"] - ry
dist = math.sqrt(dx**2 + dy**2)
if dist < GOAL_RADIUS:
logger.info(f"Roboter {robot_tag_id} Ziel erreicht! {'(DR)' if use_dr else ''}")
goal.clear()
robot_last_pos.pop(robot_tag_id, None)
await send_to_robot(robot_tag_id, {"speed_l": 0.0, "speed_r": 0.0})
continue
# Anti-Stecken
if check_stuck(robot_tag_id, rx, ry):
await send_to_robot(robot_tag_id, {"speed_l": -80.0, "speed_r": -80.0})
await asyncio.sleep(0.4)
continue
# Hindernisse sammeln
obstacles = []
for tid, state in tracker.state.items():
if tid == robot_tag_id or tid in REFERENCE_IDS or tid not in OBSTACLE_IDS:
continue
if calibration.calibrated:
ox, oy = calibration.pixel_to_world(state["x"], state["y"])
else:
ox, oy = state["x"], state["y"]
obstacles.append({"id": tid, "x": ox, "y": oy})
# Potentialfeld
fx, fy = compute_avoidance(rx, ry, goal["x"], goal["y"], obstacles, SAFE_DISTANCE, REPULSION)
# Geschwindigkeitsbefehl
cmd = force_to_speeds(
fx, fy, rtheta,
SPEED_BASE, SPEED_TURN, SPEED_MIN, SPEED_MAX,
ANGLE_THRESHOLD, AXLE_WIDTH,
robot_id=robot_tag_id,
dist_to_goal=dist,
)
# Logging
target_angle = math.degrees(math.atan2(fx, -fy))
angle_error = (target_angle - rtheta + 180) % 360 - 180
nav_logger.log(rx, ry, rtheta, goal["x"], goal["y"], cmd, obstacles, angle_error)
await send_to_robot(robot_tag_id, cmd)
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
@app.on_event("startup")
async def startup() -> None:
loop = asyncio.get_running_loop()
tracker.start(loop)
tcp_server = await asyncio.start_server(handle_robot, "0.0.0.0", TCP_PORT)
asyncio.create_task(tcp_server.serve_forever(), name="tcp-server")
asyncio.create_task(broadcast_loop(), name="broadcast")
asyncio.create_task(controller_loop(), name="controller")
logger.info("Server v009 gestartet — Encoder-PID aktiv")
@app.on_event("shutdown")
async def shutdown() -> None:
tracker.stop()
nav_logger.close()
logger.info("Server gestoppt")
# ---------------------------------------------------------------------------
# Broadcast
# ---------------------------------------------------------------------------
async def broadcast_loop() -> None:
while True:
tags = await tracker.get_tags()
if not calibration.calibrated:
calibration.update(tags)
tags_out = []
obstacles_out = []
for t in tags:
t2 = dict(t)
if calibration.calibrated and t["id"] not in REFERENCE_IDS:
x_mm, y_mm = calibration.pixel_to_world(t["x"], t["y"])
t2["x_mm"] = round(x_mm, 1)
t2["y_mm"] = round(y_mm, 1)
# Geschwindigkeit aus Roboter-Telemetrie hinzufügen
if t["id"] in robots:
r = robots[t["id"]]
t2["speed_actual_l"] = round(r.get("speed_l", 0), 1)
t2["speed_actual_r"] = round(r.get("speed_r", 0), 1)
t2["speed_mm_s"] = round(
(abs(r.get("speed_l", 0)) + abs(r.get("speed_r", 0))) / 2, 1
)
tags_out.append(t2)
if t["id"] in OBSTACLE_IDS:
obstacles_out.append({
"id": t["id"], "x": t["x"], "y": t["y"],
"x_mm": t2.get("x_mm"), "y_mm": t2.get("y_mm"),
})
payload = {
"tags": tags_out,
"goal": goal if goal else None,
"calibrated": calibration.calibrated,
"robots_connected": list(robots.keys()),
"field_corners_px": calibration.get_field_corners_px(),
"obstacles": obstacles_out,
}
dead: set[WebSocket] = set()
for ws in list(browser_clients):
try:
await ws.send_json(payload)
except Exception:
dead.add(ws)
browser_clients.difference_update(dead)
await asyncio.sleep(broadcast_interval)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/")
def wizard():
return FileResponse("static/wizard.html")
@app.get("/view")
def view():
return FileResponse("static/view.html")
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket) -> None:
await ws.accept()
browser_clients.add(ws)
logger.info(f"Browser verbunden ({len(browser_clients)} gesamt)")
try:
while True:
msg = await ws.receive_text()
data = json.loads(msg)
if data.get("type") == "goal":
gx, gy = data["x"], data["y"]
if calibration.calibrated:
gx, gy = calibration.pixel_to_world(gx, gy)
goal["x"] = gx
goal["y"] = gy
logger.info(f"Neues Ziel: x={gx:.1f} y={gy:.1f} {'mm' if calibration.calibrated else 'px'}")
elif data.get("type") == "calibrate":
tag_list = [{"id": tid, "x": s["x"], "y": s["y"]} for tid, s in tracker.state.items()]
success = calibration.update(tag_list)
await ws.send_json({"type": "calibration_result", "success": success})
except (WebSocketDisconnect, Exception):
browser_clients.discard(ws)
logger.info(f"Browser getrennt ({len(browser_clients)} verbleibend)")

12
apriltag_ws.py Normal file
View file

@ -0,0 +1,12 @@
"""
apriltag_ws.py DEPRECATED seit v005
Diese Standalone-Version wurde durch app.py + tracker.py ersetzt.
Nicht mehr verwenden. Wird in einer zukünftigen Version entfernt.
Zum Starten des Trackers bitte nutzen:
uvicorn app:app --host 0.0.0.0 --port 8000
"""
raise SystemExit(
"apriltag_ws.py ist deprecated. Bitte 'uvicorn app:app' verwenden."
)

95
calibration.py Normal file
View file

@ -0,0 +1,95 @@
"""
calibration.py Robot Tracker v007
Berechnet planare Homographie aus 4 Referenz-AprilTags.
Wandelt Kamera-Pixel in Weltkoordinaten (mm) um.
"""
import logging
import numpy as np
import cv2
from typing import Optional
logger = logging.getLogger(__name__)
class Calibration:
def __init__(self, reference_tags: dict):
"""
reference_tags: {tag_id: [x_mm, y_mm], ...}
Beispiel: {0: [0, 0], 1: [100.5, 0], 2: [0, 181], 3: [100.5, 181]}
"""
self.reference_tags = reference_tags
self.H: Optional[np.ndarray] = None # Homographie-Matrix Pixel → mm
self.H_inv: Optional[np.ndarray] = None # mm → Pixel
self.calibrated = False
def update(self, detected_tags: list) -> bool:
"""
Versucht Homographie aus aktuell sichtbaren Referenz-Tags zu berechnen.
Gibt True zurück wenn Kalibrierung erfolgreich.
"""
# Nur Referenz-Tags filtern
ref_detected = {
t["id"]: t
for t in detected_tags
if t["id"] in self.reference_tags
}
if len(ref_detected) < 4:
return False
# Punkte aufbauen
pts_pixel = []
pts_world = []
for tag_id, world_pos in self.reference_tags.items():
if tag_id in ref_detected:
tag = ref_detected[tag_id]
pts_pixel.append([tag["x"], tag["y"]])
pts_world.append(world_pos)
if len(pts_pixel) < 4:
return False
src = np.array(pts_pixel, dtype=np.float32)
dst = np.array(pts_world, dtype=np.float32)
H, mask = cv2.findHomography(src, dst, cv2.RANSAC, 5.0)
if H is None:
logger.warning("Homographie-Berechnung fehlgeschlagen")
return False
self.H = H
self.H_inv = np.linalg.inv(H)
self.calibrated = True
logger.info("Kalibrierung erfolgreich!")
return True
def pixel_to_world(self, x_px: float, y_px: float) -> tuple[float, float]:
"""Pixel → Weltkoordinaten (mm)"""
if not self.calibrated:
return x_px, y_px
pt = np.array([[[x_px, y_px]]], dtype=np.float32)
result = cv2.perspectiveTransform(pt, self.H)
return float(result[0][0][0]), float(result[0][0][1])
def world_to_pixel(self, x_mm: float, y_mm: float) -> tuple[float, float]:
"""Weltkoordinaten (mm) → Pixel"""
if not self.calibrated:
return x_mm, y_mm
pt = np.array([[[x_mm, y_mm]]], dtype=np.float32)
result = cv2.perspectiveTransform(pt, self.H_inv)
return float(result[0][0][0]), float(result[0][0][1])
def get_field_corners_px(self) -> Optional[list]:
"""Gibt die Spielfeldecken in Pixel zurück (für Visualisierung)"""
if not self.calibrated:
return None
corners_world = [
[0, 0],
[100.5, 0],
[100.5, 181],
[0, 181],
]
return [self.world_to_pixel(x, y) for x, y in corners_world]

233
collision.py Normal file
View file

@ -0,0 +1,233 @@
"""
collision.py Robot Tracker v010
Stabiler Differentialantrieb-Regler mit Heading-Filter.
Kernverbesserungen gegenüber v009:
1. Kraftvektor wird geglättet (EMA) Kamerarauschen triggert keinen Spin mehr
2. SPIN_THRESHOLD auf 120° erhöht Kurvenfahrt bis 120° möglich
3. Hindernisabstoßung sanfter kein abruptes Umkehren des Zielvektors
4. Heading-Deadband: kleine Winkelfehler (<8°) werden ignoriert keine Zitterkorrekturen
Gewünschtes Verhalten:
- Ausrichten, dann geradeaus fahren
- Kurskorrektur nur wenn Abweichung wirklich signifikant
- Hindernisse smooth umkurven, nicht drum herumdrehen
"""
import math
import logging
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------
# Regler-Parameter
# -----------------------------------------------------------------------
# Spin: nur bei wirklich falscher Richtung
SPIN_THRESHOLD = 90.0 # zurück auf 90° — 120° war zu hoch, Endlos-Spin
SPIN_STOP = 35.0 # Spin beenden
SPIN_SLOWDOWN = 60.0 # ab hier Spin abbremsen
HEADING_DEADBAND = 6.0 # Winkelfehler < 6° → geradeaus
SLOWDOWN_DIST = 150.0
SLOWDOWN_MIN = 0.35
# Kraft-Glättung: höher als 0.15 damit Spin-Korrektur schnell ankommt
# 0.15 war zu träge — Roboter überschoss immer und kam nie unter SPIN_STOP
FORCE_ALPHA = 0.35
# Zustand pro Roboter
_state: dict = {} # robot_id → {"spinning": bool, "fx": float, "fy": float}
def _get_state(robot_id: int) -> dict:
if robot_id not in _state:
_state[robot_id] = {"spinning": False, "fx": 0.0, "fy": 0.0}
return _state[robot_id]
def compute_avoidance(
robot_x: float,
robot_y: float,
goal_x: float,
goal_y: float,
obstacles: list,
safe_distance: float,
repulsion_strength: float,
) -> tuple:
"""
Potentialfeld: Zielanziehung + sanfte Hindernisabstoßung.
Gibt (fx, fy) als normalisierten Richtungsvektor zurück.
Änderungen v010:
- Abstoßung verwendet kubische Abschwächung (nicht quadratisch)
sanfterer Verlauf, kein abrupter Richtungswechsel
- Abstoßung skaliert mit Zieldistanz: weit weg = Hindernis weniger relevant
"""
dx_goal = goal_x - robot_x
dy_goal = goal_y - robot_y
dist_goal = math.sqrt(dx_goal**2 + dy_goal**2)
if dist_goal < 1:
return 0.0, 0.0
fx = dx_goal / dist_goal
fy = dy_goal / dist_goal
for obs in obstacles:
dx = robot_x - obs["x"]
dy = robot_y - obs["y"]
dist = math.sqrt(dx**2 + dy**2)
if dist < 1 or dist >= safe_distance:
continue
# Kubische Abschwächung: stärker nah, aber kein harter Sprung
factor = ((safe_distance - dist) / safe_distance) ** 2
repulsion = repulsion_strength * factor
fx += (dx / dist) * repulsion
fy += (dy / dist) * repulsion
magnitude = math.sqrt(fx**2 + fy**2)
if magnitude < 1e-6:
return 0.0, 0.0
return fx / magnitude, fy / magnitude
def force_to_speeds(
force_x: float,
force_y: float,
robot_theta: float,
speed_base: float,
speed_turn: float,
speed_min: float,
speed_max: float,
angle_threshold: float, # nicht mehr genutzt
axle_width: float,
robot_id: int = 0,
dist_to_goal: float = 9999.0,
) -> dict:
"""
Stabiler Differentialantrieb-Regler mit Heading-Filter.
Der Kraftvektor wird geglättet bevor der Fehler berechnet wird.
Das eliminiert Rauschen-getriebene Spin-Trigger und Zitterkorrekturen.
"""
st = _get_state(robot_id)
# -----------------------------------------------------------------------
# Schritt 1: Kraftvektor glätten (EMA)
# Neue Kameramessung fließt nur zu FORCE_ALPHA ein.
# Ergebnis: kein abrupter Richtungswechsel durch Pixelrauschen
# -----------------------------------------------------------------------
prev_fx = st["fx"]
prev_fy = st["fy"]
if prev_fx == 0.0 and prev_fy == 0.0:
# Erster Aufruf: direkt setzen
st["fx"] = force_x
st["fy"] = force_y
else:
st["fx"] = prev_fx + FORCE_ALPHA * (force_x - prev_fx)
st["fy"] = prev_fy + FORCE_ALPHA * (force_y - prev_fy)
# Geglätteter Vektor normalisieren
mag = math.sqrt(st["fx"]**2 + st["fy"]**2)
if mag < 1e-6:
return {"speed_l": 0.0, "speed_r": 0.0, "status": "idle"}
sfx = st["fx"] / mag
sfy = st["fy"] / mag
# -----------------------------------------------------------------------
# Schritt 2: Winkelfehler aus geglättetem Vektor
# -----------------------------------------------------------------------
target_angle = math.degrees(math.atan2(sfx, -sfy))
angle_error = (target_angle - robot_theta + 180) % 360 - 180
abs_error = abs(angle_error)
sign = 1.0 if angle_error > 0 else -1.0
# -----------------------------------------------------------------------
# Schritt 3: Spin-Entscheidung
# Kein Spin wenn nah am Ziel (immer vorwärts einfahren)
# -----------------------------------------------------------------------
near_goal = dist_to_goal < SLOWDOWN_DIST
if near_goal:
st["spinning"] = False
elif st["spinning"]:
if abs_error < SPIN_STOP:
st["spinning"] = False
else:
if abs_error > SPIN_THRESHOLD:
st["spinning"] = True
# -----------------------------------------------------------------------
# Zone A: Spin auf der Stelle (nur bei >120° Fehler)
# -----------------------------------------------------------------------
if st["spinning"]:
if abs_error > SPIN_SLOWDOWN:
spin_speed = min(speed_turn, max(speed_min, abs_error * 0.8))
else:
t_slow = (abs_error - SPIN_STOP) / max(1.0, SPIN_SLOWDOWN - SPIN_STOP)
t_slow = max(0.0, min(1.0, t_slow))
spin_speed = speed_min + t_slow * (speed_turn - speed_min)
return {
"speed_l": round(spin_speed * sign, 1),
"speed_r": round(-spin_speed * sign, 1),
"status": "spinning",
}
# -----------------------------------------------------------------------
# Zone B: Kurvenfahrt (0120°)
# Differentialantrieb: Außenrad voll, Innenrad reduziert
#
# Deadband: bei < HEADING_DEADBAND geradeaus (kein Zittern)
# -----------------------------------------------------------------------
# Verlangsamung bei Zielanfahrt
if dist_to_goal < SLOWDOWN_DIST:
slowdown = max(SLOWDOWN_MIN, dist_to_goal / SLOWDOWN_DIST)
base = speed_base * slowdown
base = max(speed_min, base)
else:
base = speed_base
if abs_error < HEADING_DEADBAND:
# Geradeaus — kein Lenken
speed_l = base
speed_r = base
status = "straight"
else:
# Normalisierter Fehler: 0 bei DEADBAND, 1.0 bei SPIN_THRESHOLD
effective_error = abs_error - HEADING_DEADBAND
effective_range = SPIN_THRESHOLD - HEADING_DEADBAND
t = min(1.0, effective_error / effective_range)
# Außenrad: voll
outer = base
# Innenrad: von base (t=0) bis -base (t=1)
# Bei t=0.5 (~64°): Innenrad = 0 (enge Kurve)
inner = base * (1.0 - 2.0 * t)
if angle_error > 0:
speed_l = inner
speed_r = outer
else:
speed_l = outer
speed_r = inner
status = "curving"
speed_l = max(-speed_max, min(speed_max, speed_l))
speed_r = max(-speed_max, min(speed_max, speed_r))
return {"speed_l": round(speed_l, 1), "speed_r": round(speed_r, 1), "status": status}
def reset_robot(robot_id: int) -> None:
"""Zustand für einen Roboter zurücksetzen (z.B. bei Verbindungsabbruch)."""
_state.pop(robot_id, None)

81
config.yaml Normal file
View file

@ -0,0 +1,81 @@
# Robot Tracker v009 — Encoder-PID Navigation
camera:
rtsp_url: "rtsp://192.168.0.30:8554/live"
width: 1280
height: 720
rtsp_transport: "udp"
reconnect_delay: 3.0
reconnect_max_attempts: 0
server:
host: "0.0.0.0"
port: 80
broadcast_hz: 30
tracker:
tag_family: "tag36h11"
detector_threads: 2
quad_decimate: 1.0
quad_sigma: 0.0
refine_edges: 1
decode_sharpening: 0.25
ema_alpha_pos: 0.35
ema_alpha_ang: 0.35
ema_alpha_vel: 0.25
hold_seconds: 0.80
predict_seconds: 0.25
robots:
tcp_port: 8765
robot_tag_ids: [91]
goal_radius: 80 # mm — Ankunftsradius
# Geschwindigkeiten in mm/s
speed_base: 160 # mm/s Fahrgeschwindigkeit
speed_turn: 90 # mm/s Drehgeschwindigkeit (nur bei >120°)
speed_min: 50 # mm/s Minimum
speed_max: 350 # mm/s Maximum
angle_threshold: 120 # Grad — jetzt nur noch für spin
# Dead Reckoning
dead_reckoning_timeout: 0.5 # Sekunden ohne Tag → Encoder-Schätzung nutzen
calibration:
reference_tags:
0: [0.0, 0.0 ]
1: [100.5, 0.0 ]
2: [0.0, 181.0]
3: [100.5, 181.0]
min_tags_required: 4
obstacles:
safe_distance: 150
repulsion_strength: 1500 # war 5000 — sanftere Abstoßung, kein Richtungssprung
obstacle_tag_ids:
- 11
- 12
- 21
- 22
- 31
- 32
- 41
- 42
- 51
- 52
- 61
- 62
- 71
- 72
- 81
- 82
- 92
# Roboter Hardware-Parameter für Encoder-PID
robot_hardware:
wheel_diameter_mm: 67.0 # Außendurchmesser Rad
wheel_circumference_mm: 210.5 # π × 67
encoder_pulses_per_rev: 20 # Löcher in der Lochscheibe
mm_per_pulse: 10.525 # Umfang / Pulse = 210.5 / 20
axle_width_mm: 121.0 # Mitte Rad links zu Mitte Rad rechts
pulses_per_360deg: 36.1 # Drehkreis / mm_per_pulse

426
main.py Normal file
View file

@ -0,0 +1,426 @@
"""
main.py ESP32 Roboter Firmware v011
Verbesserungen gegenüber v009/v010:
1. Feedforward-Regler: kS (Haftreibung) + kV (Geschwindigkeit)
PID regelt nur noch den kleinen Restfehler statt alles alleine
kein Überschwingen beim Anfahren, sofort stabile Geschwindigkeit
2. Gleitendes Messfenster für Istgeschwindigkeit (3 Ticks = 150ms)
bei 20 Pulsen/Umdrehung ist 50ms zu grob quantisiertes Rauschen
längeres Fenster = ruhigere PID-Eingabe = weniger Zittern
3. Reduzierte PID-Gains (KP, KI, KD) weil Feedforward die Hauptarbeit macht
4. Anti-Windup: Integral einfrieren bei PWM-Sättigung
5. Getrennte kS/kV pro Rad (links/rechts oft unterschiedlich)
Tuning-Anleitung:
Schritt 1: kS_L, kS_R einstellen
Roboter soll bei Sollgeschwindigkeit 1 mm/s gerade noch anfahren
Typisch 100-180 bei TB6612/L9110S + 3.7V LiPo
Schritt 2: kV_L, kV_R einstellen
Roboter soll bei 160 mm/s Soll ungefähr 160 mm/s Ist fahren (ohne PID)
kV = (PWM_bei_160mmps - kS) / 160
Typisch 2.0-3.5 je nach Motor/Batterie
Schritt 3: KP klein lassen (0.3-0.8), KI sehr klein (0.05-0.15)
Feedforward macht 90% der Arbeit, PID nur Feinkorrektur
"""
import network
import asyncio
import json
from machine import Pin, PWM
import time
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ROBOT_ID = 91
WIFI_SSID = "FrankeLab"
WIFI_PSW = "FrankeLab@Edu"
SERVER_IP = "192.168.0.250"
SERVER_PORT = 8765
MM_PER_PULSE = 10.525
TELEMETRY_MS = 100
# ---------------------------------------------------------------------------
# Feedforward-Parameter — HIER TUNEN
# ---------------------------------------------------------------------------
# Feedforward deaktiviert (Option B) — reiner PID wie v009
# Zum Aktivieren: KS auf ~150, KV auf ~2.5 setzen und KP/KI reduzieren
# ---------------------------------------------------------------------------
KS_L = 0.0
KS_R = 0.0
KV_L = 0.0
KV_R = 0.0
# PID-Parameter: bewährte v009-Werte
KP = 1.2
KI = 0.3
KD = 0.05
PWM_MIN = 150 # Mindestwert damit Motor anläuft (nur ohne Feedforward)
PWM_MAX = 950
# Gleitendes Messfenster: Anzahl PID-Ticks über die gemittelt wird
# 3 Ticks × 50ms = 150ms Fenster → bei 20 Pulse/Rev viel ruhigeres Signal
SPEED_WINDOW = 3
# Rampe
RAMP_UP = 40.0 # mm/s pro Tick
RAMP_DOWN = 80.0
# ---------------------------------------------------------------------------
# Motoren
# ---------------------------------------------------------------------------
ain1 = Pin(17, Pin.OUT)
ain2 = Pin(18, Pin.OUT)
pwma = PWM(Pin(8), freq=1000)
bin1 = Pin(9, Pin.OUT)
bin2 = Pin(10, Pin.OUT)
pwmb = PWM(Pin(11), freq=1000)
def motors_off():
ain1.value(0); ain2.value(0); pwma.duty(0)
bin1.value(0); bin2.value(0); pwmb.duty(0)
motors_off()
# ---------------------------------------------------------------------------
# Encoder
# ---------------------------------------------------------------------------
enc_l = 0
enc_r = 0
enc_l_total = 0
enc_r_total = 0
def isr_l(p):
global enc_l, enc_l_total
enc_l += 1
enc_l_total += 1
def isr_r(p):
global enc_r, enc_r_total
enc_r += 1
enc_r_total += 1
pin_enc_l = Pin(6, Pin.IN, Pin.PULL_UP)
pin_enc_r = Pin(7, Pin.IN, Pin.PULL_UP)
pin_enc_l.irq(trigger=Pin.IRQ_RISING, handler=isr_l)
pin_enc_r.irq(trigger=Pin.IRQ_RISING, handler=isr_r)
# ---------------------------------------------------------------------------
# PID (jetzt nur noch für Restfehler)
# ---------------------------------------------------------------------------
class PID:
def __init__(self, kp, ki, kd):
self.kp = kp
self.ki = ki
self.kd = kd
self.integral = 0.0
self.last_error = 0.0
self.last_time = time.ticks_ms()
self.saturated = False # für Anti-Windup
def reset(self):
self.integral = 0.0
self.last_error = 0.0
self.last_time = time.ticks_ms()
self.saturated = False
def compute(self, target, actual, pwm_saturated=False):
now = time.ticks_ms()
dt = time.ticks_diff(now, self.last_time) / 1000.0
if dt < 0.001:
dt = 0.001
self.last_time = now
error = target - actual
# Anti-Windup: Integral nur aufbauen wenn nicht gesättigt
# oder wenn Korrektur in richtige Richtung geht
if not pwm_saturated or (error * self.integral < 0):
self.integral += error * dt
self.integral = max(-200.0, min(200.0, self.integral))
derivative = (error - self.last_error) / dt
self.last_error = error
return self.kp * error + self.ki * self.integral + self.kd * derivative
pid_l = PID(KP, KI, KD)
pid_r = PID(KP, KI, KD)
# ---------------------------------------------------------------------------
# Zustandsvariablen
# ---------------------------------------------------------------------------
target_speed_l = 0.0
target_speed_r = 0.0
ramped_speed_l = 0.0
ramped_speed_r = 0.0
current_pwm_l = 0.0
current_pwm_r = 0.0
# Gleitendes Fenster für Istgeschwindigkeit
# Speichert die letzten SPEED_WINDOW Encoder-Deltas + Zeiten
_enc_hist_l = [0] * SPEED_WINDOW # Pulse-Deltas
_enc_hist_r = [0] * SPEED_WINDOW
_dt_hist = [0.05] * SPEED_WINDOW # Zeitdeltas in Sekunden
_hist_idx = 0
# ---------------------------------------------------------------------------
# Rampe
# ---------------------------------------------------------------------------
def apply_ramp(ramped, target):
if target == 0.0:
if ramped > 0:
return max(0.0, ramped - RAMP_DOWN)
elif ramped < 0:
return min(0.0, ramped + RAMP_DOWN)
return 0.0
if (target > 0 and ramped >= 0) or (target < 0 and ramped <= 0):
if abs(ramped) < abs(target):
if target > 0:
return min(target, ramped + RAMP_UP)
else:
return max(target, ramped - RAMP_UP)
elif abs(ramped) > abs(target):
if target > 0:
return max(target, ramped - RAMP_DOWN)
else:
return min(target, ramped + RAMP_DOWN)
return target
else:
# Richtungswechsel: erst auf 0
if ramped > 0:
return max(0.0, ramped - RAMP_DOWN)
else:
return min(0.0, ramped + RAMP_DOWN)
# ---------------------------------------------------------------------------
# Feedforward-Berechnung
# ---------------------------------------------------------------------------
def feedforward(speed_target, ks, kv):
"""
Berechnet PWM-Vorsteuerung aus Sollgeschwindigkeit.
u_ff = kS*sign(v) + kV*|v|
kS überwindet Haftreibung (Motoranlauf)
kV deckt die lineare Kennlinie ab
Ergebnis ist vorzeichenbehaftet (negativ = rückwärts).
"""
if speed_target == 0.0:
return 0.0
sign = 1.0 if speed_target > 0 else -1.0
return sign * (ks + kv * abs(speed_target))
# ---------------------------------------------------------------------------
# Motor setzen
# ---------------------------------------------------------------------------
def set_motor(a1, a2, pwm_obj, pwm_val):
pwm_val = max(-PWM_MAX, min(PWM_MAX, pwm_val))
if pwm_val > 0:
a1.value(1); a2.value(0)
pwm_obj.duty(int(pwm_val))
elif pwm_val < 0:
a1.value(0); a2.value(1)
pwm_obj.duty(int(-pwm_val))
else:
a1.value(0); a2.value(0)
pwm_obj.duty(0)
# ---------------------------------------------------------------------------
# PID Loop mit Feedforward
# ---------------------------------------------------------------------------
async def pid_loop():
global enc_l, enc_r, current_pwm_l, current_pwm_r
global ramped_speed_l, ramped_speed_r
global _enc_hist_l, _enc_hist_r, _dt_hist, _hist_idx
last_enc_l = 0
last_enc_r = 0
last_time = time.ticks_ms()
while True:
await asyncio.sleep_ms(50)
now = time.ticks_ms()
dt = time.ticks_diff(now, last_time) / 1000.0
if dt < 0.001:
dt = 0.001
last_time = now
# --- Rampe ---
ramped_speed_l = apply_ramp(ramped_speed_l, target_speed_l)
ramped_speed_r = apply_ramp(ramped_speed_r, target_speed_r)
# --- Encoder-Delta messen und ins Fenster eintragen ---
delta_l = enc_l - last_enc_l
delta_r = enc_r - last_enc_r
last_enc_l = enc_l
last_enc_r = enc_r
_enc_hist_l[_hist_idx] = delta_l
_enc_hist_r[_hist_idx] = delta_r
_dt_hist[_hist_idx] = dt
_hist_idx = (_hist_idx + 1) % SPEED_WINDOW
# Istgeschwindigkeit aus Summe des Fensters
sum_pulses_l = sum(_enc_hist_l)
sum_pulses_r = sum(_enc_hist_r)
sum_dt = sum(_dt_hist)
if sum_dt > 0:
actual_l = (sum_pulses_l * MM_PER_PULSE) / sum_dt
actual_r = (sum_pulses_r * MM_PER_PULSE) / sum_dt
else:
actual_l = 0.0
actual_r = 0.0
# --- Stopp ---
if ramped_speed_l == 0.0 and ramped_speed_r == 0.0:
motors_off()
current_pwm_l = 0.0
current_pwm_r = 0.0
pid_l.reset()
pid_r.reset()
# Fenster leeren damit nach Stopp kein Phantomspeed
for i in range(SPEED_WINDOW):
_enc_hist_l[i] = 0
_enc_hist_r[i] = 0
continue
# --- Linker Motor: Feedforward + PID ---
if ramped_speed_l != 0.0:
ff_l = feedforward(ramped_speed_l, KS_L, KV_L)
sat_l = abs(current_pwm_l) >= PWM_MAX
pid_out_l = pid_l.compute(abs(ramped_speed_l), actual_l, sat_l)
sign_l = 1.0 if ramped_speed_l > 0 else -1.0
if KS_L == 0.0 and KV_L == 0.0:
# Kein Feedforward: aufaddieren wie v009 damit PWM anwächst
current_pwm_l += pid_out_l * sign_l
if sign_l > 0 and current_pwm_l < PWM_MIN:
current_pwm_l = float(PWM_MIN)
elif sign_l < 0 and current_pwm_l > -PWM_MIN:
current_pwm_l = float(-PWM_MIN)
else:
# Mit Feedforward: direkt setzen, FF macht Grundlast
current_pwm_l = ff_l + pid_out_l * sign_l
else:
current_pwm_l = 0.0
pid_l.reset()
# --- Rechter Motor: Feedforward + PID ---
if ramped_speed_r != 0.0:
ff_r = feedforward(ramped_speed_r, KS_R, KV_R)
sat_r = abs(current_pwm_r) >= PWM_MAX
pid_out_r = pid_r.compute(abs(ramped_speed_r), actual_r, sat_r)
sign_r = 1.0 if ramped_speed_r > 0 else -1.0
if KS_R == 0.0 and KV_R == 0.0:
# Kein Feedforward: aufaddieren wie v009
current_pwm_r += pid_out_r * sign_r
if sign_r > 0 and current_pwm_r < PWM_MIN:
current_pwm_r = float(PWM_MIN)
elif sign_r < 0 and current_pwm_r > -PWM_MIN:
current_pwm_r = float(-PWM_MIN)
else:
# Mit Feedforward: direkt setzen
current_pwm_r = ff_r + pid_out_r * sign_r
else:
current_pwm_r = 0.0
pid_r.reset()
set_motor(ain1, ain2, pwma, current_pwm_l)
set_motor(bin1, bin2, pwmb, current_pwm_r)
# ---------------------------------------------------------------------------
# WiFi
# ---------------------------------------------------------------------------
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("WiFi verbinde...")
wlan.connect(WIFI_SSID, WIFI_PSW)
for _ in range(20):
if wlan.isconnected():
break
time.sleep(0.5)
if wlan.isconnected():
print("WiFi OK IP=" + wlan.ifconfig()[0])
return True
print("WiFi fehlgeschlagen")
return False
# ---------------------------------------------------------------------------
# TCP
# ---------------------------------------------------------------------------
async def robot_loop():
global target_speed_l, target_speed_r
if not wifi_connect():
return
while True:
try:
print("Verbinde mit Server...")
reader, writer = await asyncio.open_connection(SERVER_IP, SERVER_PORT)
print("Verbunden!")
hello = json.dumps({"robot_id": ROBOT_ID, "version": "v011"}) + "\n"
writer.write(hello.encode())
await writer.drain()
last_telem = time.ticks_ms()
while True:
now = time.ticks_ms()
if time.ticks_diff(now, last_telem) >= TELEMETRY_MS:
telem = json.dumps({
"robot_id": ROBOT_ID,
"enc_l": enc_l_total,
"enc_r": enc_r_total,
"speed_l": round(ramped_speed_l, 1),
"speed_r": round(ramped_speed_r, 1),
"pwm_l": int(current_pwm_l),
"pwm_r": int(current_pwm_r),
}) + "\n"
writer.write(telem.encode())
await writer.drain()
last_telem = now
try:
line = await asyncio.wait_for(reader.readline(), timeout=0.05)
if line:
cmd = json.loads(line.decode().strip())
if "speed_l" in cmd:
target_speed_l = float(cmd["speed_l"])
target_speed_r = float(cmd["speed_r"])
elif "left" in cmd:
target_speed_l = float(cmd["left"])
target_speed_r = float(cmd["right"])
except asyncio.TimeoutError:
pass
except Exception as e:
print("Fehler: " + str(e))
target_speed_l = 0.0
target_speed_r = 0.0
motors_off()
pid_l.reset()
pid_r.reset()
time.sleep(2)
print("Reconnect...")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main():
asyncio.create_task(pid_loop())
await robot_loop()
asyncio.run(main())

90
navlogger.py Normal file
View file

@ -0,0 +1,90 @@
"""
navlogger.py Robot Tracker v009
Zeichnet alle Navigationsdaten auf.
Kompatibel mit speed_l/speed_r Befehlen (mm/s).
"""
import csv
import logging
import math
import time
from pathlib import Path
logger = logging.getLogger(__name__)
class NavLogger:
def __init__(self, log_dir: str = "logs"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
self.file = None
self.writer = None
self.session_start = None
self._open_new_file()
def _open_new_file(self):
ts = time.strftime("%Y%m%d_%H%M%S")
path = self.log_dir / f"nav_{ts}.csv"
self.file = open(path, "w", newline="")
self.writer = csv.writer(self.file)
self.writer.writerow([
"timestamp",
"elapsed_s",
"robot_x_mm", "robot_y_mm", "robot_theta",
"goal_x_mm", "goal_y_mm",
"dist_to_goal_mm",
"angle_error_deg",
"cmd_speed_l", "cmd_speed_r", "cmd_status",
"num_obstacles",
"nearest_obstacle_id",
"nearest_obstacle_dist_mm",
"all_obstacle_dists",
])
self.session_start = time.time()
logger.info(f"NavLogger: Neue Log-Datei {path}")
def log(
self,
robot_x: float, robot_y: float, robot_theta: float,
goal_x: float, goal_y: float,
cmd: dict,
obstacles: list,
angle_error: float,
):
now = time.time()
elapsed = round(now - self.session_start, 3)
dist_goal = math.sqrt((goal_x - robot_x)**2 + (goal_y - robot_y)**2)
nearest_id = None
nearest_dist = float("inf")
obs_dists = []
for obs in obstacles:
d = math.sqrt((robot_x - obs["x"])**2 + (robot_y - obs["y"])**2)
obs_dists.append(f"{obs['id']}:{d:.0f}")
if d < nearest_dist:
nearest_dist = d
nearest_id = obs["id"]
# Kompatibel mit speed_l/speed_r (v009) und left/right (v008)
speed_l = cmd.get("speed_l", cmd.get("left", 0))
speed_r = cmd.get("speed_r", cmd.get("right", 0))
self.writer.writerow([
round(now, 3),
elapsed,
round(robot_x, 1), round(robot_y, 1), round(robot_theta, 1),
round(goal_x, 1), round(goal_y, 1),
round(dist_goal, 1),
round(angle_error, 1),
round(speed_l, 1), round(speed_r, 1), cmd.get("status", ""),
len(obstacles),
nearest_id,
round(nearest_dist, 1) if nearest_dist < float("inf") else "",
" | ".join(obs_dists),
])
self.file.flush()
def close(self):
if self.file:
self.file.close()

6
requirements.txt Normal file
View file

@ -0,0 +1,6 @@
fastapi>=0.111.0
uvicorn[standard]>=0.29.0
pupil-apriltags>=1.0.4
opencv-python-headless>=4.9.0
numpy>=1.26.0
pyyaml>=6.0.1

2
serverstart.txt Normal file
View file

@ -0,0 +1,2 @@
pkill -f uvicorn; sleep 1; cd /opt/robot-tracker-v010 && /opt/vision-venv/bin/python -m uvicorn app:app --host 0.0.0.0 --port 80

267
static/view.html Normal file
View file

@ -0,0 +1,267 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="utf-8" />
<title>Robot Tracker v009</title>
<style>
html, body { margin: 0; padding: 0; background: #0e0e11; color: #e0e0e0; font-family: system-ui, sans-serif; }
canvas { display: block; margin: auto; background: #14141a; cursor: crosshair; }
#hud {
position: absolute; top: 10px; left: 50%;
transform: translateX(-50%);
display: flex; gap: 10px; align-items: center;
pointer-events: none;
}
#status { font-size: 12px; padding: 4px 12px; border-radius: 99px; background: #1f1f28; color: #888; }
#status.ok { color: #4fd1c5; }
#status.err { color: #f87171; }
#cal-status { font-size: 12px; padding: 4px 12px; border-radius: 99px; background: #1f1f28; color: #f59e0b; }
#cal-status.ok { color: #4fd1c5; }
#cal-btn { font-size: 12px; padding: 4px 14px; border-radius: 99px; background: #4fd1c5; color: #0e0e11; border: none; cursor: pointer; pointer-events: all; font-weight: 600; }
#cal-btn:hover { background: #38b2ac; }
#info { position: absolute; bottom: 16px; left: 50%; transform: translateX(-50%); font-size: 12px; color: #555; pointer-events: none; }
</style>
</head>
<body>
<div id="hud">
<div id="status">Verbinde…</div>
<div id="cal-status">Nicht kalibriert</div>
<button id="cal-btn" onclick="calibrate()">Kalibrieren</button>
</div>
<canvas id="view" width="1280" height="720"></canvas>
<div id="info">Klick → Ziel setzen | Hindernisse = rot markiert | Geschwindigkeit = grüner Balken</div>
<script>
const canvas = document.getElementById("view");
const ctx = canvas.getContext("2d");
const statusEl = document.getElementById("status");
const calEl = document.getElementById("cal-status");
const FADE_SECONDS = 0.8;
const ARROW_LENGTH = 34;
const VEL_SCALE = 0.3;
const VEL_MIN_SHOW = 8;
const SAFE_DIST_PX = 60;
const SPEED_MAX_MMS = 350; // für Balkenanzeige
let goalX = null, goalY = null;
let isCalibrated = false;
let ws;
function drawGrid() {
ctx.clearRect(0, 0, canvas.width, canvas.height);
ctx.strokeStyle = "#1f1f28";
ctx.lineWidth = 1;
const step = 80;
for (let x = 0; x <= canvas.width; x += step) {
ctx.beginPath(); ctx.moveTo(x, 0); ctx.lineTo(x, canvas.height); ctx.stroke();
}
for (let y = 0; y <= canvas.height; y += step) {
ctx.beginPath(); ctx.moveTo(0, y); ctx.lineTo(canvas.width, y); ctx.stroke();
}
ctx.strokeStyle = "#3a3a44";
ctx.strokeRect(0, 0, canvas.width, canvas.height);
}
function drawFieldCorners(corners) {
if (!corners || corners.length < 4) return;
ctx.save();
ctx.strokeStyle = "#1D9E75";
ctx.lineWidth = 1.5;
ctx.setLineDash([8, 4]);
ctx.beginPath();
ctx.moveTo(corners[0][0], corners[0][1]);
ctx.lineTo(corners[1][0], corners[1][1]);
ctx.lineTo(corners[2][0], corners[2][1]);
ctx.lineTo(corners[3][0], corners[3][1]);
ctx.closePath();
ctx.stroke();
ctx.setLineDash([]);
ctx.fillStyle = "#1D9E75";
ctx.font = "11px system-ui";
ctx.fillText("Kalibrierfeld", corners[0][0] + 5, corners[0][1] + 15);
ctx.restore();
}
function drawSpeedBar(x, y, speed_mm_s, speed_l, speed_r) {
if (speed_mm_s === undefined) return;
const barW = 60;
const barH = 6;
const bx = x - barW / 2;
const by = y + 28;
const pct = Math.min(1, Math.abs(speed_mm_s) / SPEED_MAX_MMS);
// Hintergrund
ctx.save();
ctx.fillStyle = "#2a2a35";
ctx.fillRect(bx, by, barW, barH);
// Füllstand
const color = speed_mm_s > 200 ? "#f59e0b" : speed_mm_s > 100 ? "#4fd1c5" : "#1D9E75";
ctx.fillStyle = color;
ctx.fillRect(bx, by, barW * pct, barH);
// Geschwindigkeit als Text
ctx.fillStyle = "#e0e0e0";
ctx.font = "10px system-ui";
ctx.textAlign = "center";
ctx.fillText(Math.round(speed_mm_s) + " mm/s", x, by + 18);
ctx.restore();
}
function drawObstacle(obs) {
const { x, y, id, x_mm, y_mm } = obs;
ctx.save();
ctx.strokeStyle = "rgba(220, 50, 50, 0.25)";
ctx.lineWidth = 1;
ctx.setLineDash([3, 3]);
ctx.beginPath(); ctx.arc(x, y, SAFE_DIST_PX, 0, Math.PI * 2); ctx.stroke();
ctx.setLineDash([]);
ctx.strokeStyle = "#E24B4A";
ctx.lineWidth = 2;
ctx.beginPath(); ctx.arc(x, y, 12, 0, Math.PI * 2); ctx.stroke();
ctx.fillStyle = "#E24B4A";
ctx.font = "11px system-ui";
ctx.textAlign = "center";
ctx.fillText("ID " + id, x, y - 18);
if (x_mm !== undefined) {
ctx.fillStyle = "#888";
ctx.fillText(x_mm.toFixed(0) + "mm, " + y_mm.toFixed(0) + "mm", x, y - 6);
}
ctx.restore();
}
function drawGoal(x, y) {
ctx.save();
ctx.strokeStyle = "#f59e0b";
ctx.lineWidth = 2;
ctx.setLineDash([5, 5]);
ctx.beginPath(); ctx.arc(x, y, 30, 0, Math.PI * 2); ctx.stroke();
ctx.setLineDash([]);
ctx.beginPath();
ctx.moveTo(x - 10, y); ctx.lineTo(x + 10, y);
ctx.moveTo(x, y - 10); ctx.lineTo(x, y + 10);
ctx.stroke();
ctx.fillStyle = "#f59e0b";
ctx.font = "11px system-ui";
ctx.fillText("Ziel", x + 14, y - 14);
ctx.restore();
}
function drawTag(tag) {
const { id, x, y, theta, age, vx = 0, vy = 0, omega = 0, x_mm, y_mm, speed_mm_s, speed_actual_l, speed_actual_r } = tag;
const alpha = Math.max(0, 1.0 - (age ?? 0) / FADE_SECONDS);
if (alpha <= 0) return;
ctx.save();
ctx.globalAlpha = alpha;
ctx.save();
ctx.translate(x, y);
ctx.rotate(theta * Math.PI / 180);
ctx.strokeStyle = "#4fd1c5";
ctx.lineWidth = 2;
ctx.beginPath(); ctx.moveTo(0, 0); ctx.lineTo(0, -ARROW_LENGTH); ctx.stroke();
ctx.beginPath(); ctx.moveTo(-5, -ARROW_LENGTH + 8); ctx.lineTo(0, -ARROW_LENGTH); ctx.lineTo(5, -ARROW_LENGTH + 8); ctx.stroke();
ctx.beginPath(); ctx.arc(0, 0, 5, 0, Math.PI * 2); ctx.stroke();
ctx.restore();
const speed = Math.sqrt(vx * vx + vy * vy);
if (speed >= VEL_MIN_SHOW) {
ctx.save();
ctx.translate(x, y);
ctx.strokeStyle = "#a78bfa";
ctx.lineWidth = 1.5;
ctx.setLineDash([4, 3]);
ctx.beginPath(); ctx.moveTo(0, 0); ctx.lineTo(vx * VEL_SCALE, vy * VEL_SCALE); ctx.stroke();
ctx.restore();
}
ctx.fillStyle = "#e0e0e0";
ctx.font = "11px system-ui";
ctx.textAlign = "left";
ctx.textBaseline = "top";
const lines = [
"ID " + id,
x_mm !== undefined ? x_mm.toFixed(1) + "mm, " + y_mm.toFixed(1) + "mm" : "x=" + x.toFixed(0) + " y=" + y.toFixed(0),
"θ=" + theta.toFixed(1) + "°",
];
// Geschwindigkeit nur für Roboter (wenn speed_mm_s vorhanden)
if (speed_mm_s !== undefined) {
lines.push("L:" + (speed_actual_l||0).toFixed(0) + " R:" + (speed_actual_r||0).toFixed(0) + " mm/s");
}
lines.forEach((line, i) => ctx.fillText(line, x + 10, y + 10 + i * 13));
// Geschwindigkeitsbalken für Roboter
if (speed_mm_s !== undefined) {
drawSpeedBar(x, y, speed_mm_s, speed_actual_l, speed_actual_r);
}
ctx.restore();
}
function calibrate() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "calibrate" }));
calEl.textContent = "Kalibriere…";
}
}
canvas.addEventListener("click", (e) => {
const rect = canvas.getBoundingClientRect();
const scaleX = canvas.width / rect.width;
const scaleY = canvas.height / rect.height;
goalX = (e.clientX - rect.left) * scaleX;
goalY = (e.clientY - rect.top) * scaleY;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "goal", x: goalX, y: goalY }));
}
});
drawGrid();
let reconnectTimer;
function connect() {
ws = new WebSocket("ws://" + location.host + "/ws");
ws.onopen = () => { statusEl.textContent = "Verbunden"; statusEl.className = "ok"; };
ws.onmessage = (ev) => {
const data = JSON.parse(ev.data);
if (data.type === "calibration_result") {
calEl.textContent = data.success ? "Kalibriert ✓" : "Fehlgeschlagen";
calEl.className = data.success ? "ok" : "";
return;
}
if (data.calibrated) {
isCalibrated = true;
calEl.textContent = "Kalibriert ✓";
calEl.className = "ok";
}
drawGrid();
if (data.field_corners_px) drawFieldCorners(data.field_corners_px);
(data.obstacles || []).forEach(drawObstacle);
if (goalX !== null) drawGoal(goalX, goalY);
(data.tags || []).forEach(tag => {
const isObstacle = (data.obstacles || []).some(o => o.id === tag.id);
if (!isObstacle) drawTag(tag);
});
};
ws.onerror = () => { statusEl.textContent = "Verbindungsfehler"; statusEl.className = "err"; };
ws.onclose = () => {
statusEl.textContent = "Getrennt — reconnect…";
statusEl.className = "err";
clearTimeout(reconnectTimer);
reconnectTimer = setTimeout(connect, 2000);
};
}
connect();
</script>
</body>
</html>

60
static/wizard.html Normal file
View file

@ -0,0 +1,60 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="utf-8" />
<title>Robot Tracker v005 Setup</title>
<style>
body { margin: 0; background: #0e0e11; color: #e0e0e0; font-family: system-ui, sans-serif; }
.container { max-width: 720px; margin: 60px auto; padding: 40px; background: #14141a; border-radius: 12px; }
h1 { margin-top: 0; font-size: 22px; }
.badge { display: inline-block; font-size: 11px; padding: 2px 8px; border-radius: 99px; background: #1f2937; color: #4fd1c5; margin-bottom: 24px; }
.step { margin: 18px 0; padding-left: 12px; border-left: 3px solid #4fd1c5; line-height: 1.6; }
.step strong { color: #4fd1c5; }
code { background: #1a1a22; padding: 2px 6px; border-radius: 4px; font-size: 13px; }
ul { margin: 8px 0; padding-left: 20px; }
li { margin: 4px 0; }
button {
margin-top: 32px; padding: 14px 28px; font-size: 16px;
border: none; border-radius: 8px; cursor: pointer;
background: #4fd1c5; color: #0e0e11; font-weight: 600;
}
button:hover { background: #38b2ac; }
</style>
</head>
<body>
<div class="container">
<h1>Robot Tracker</h1>
<span class="badge">v005</span>
<div class="step">
<strong>1.</strong> HuskyLens 2 mit dem lokalen WLAN verbinden.
</div>
<div class="step">
<strong>2.</strong> RTSP-Stream auf der HuskyLens starten:
<ul>
<li>Streaming öffnen</li>
<li>RTSP aktivieren und mit <em>Yes</em> bestätigen</li>
<li>Modus wählen (z.&nbsp;B. Object Tracking)</li>
</ul>
</div>
<div class="step">
<strong>3.</strong> IP-Adresse der Kamera in <code>config.yaml</code> unter
<code>camera.rtsp_url</code> eintragen, falls abweichend.
</div>
<div class="step">
<strong>4.</strong> Kamera über dem Spielfeld montieren — möglichst senkrecht,
Spielfeldrand sollte vollständig sichtbar sein.
</div>
<div class="step">
<strong>5.</strong> AprilTag-Marker (Familie <code>tag36h11</code>) auf den
Robotern befestigen — flach aufliegen, deutlich sichtbar von oben.
</div>
<button onclick="location.href='/view'">Tracker starten →</button>
</div>
</body>
</html>

318
tracker.py Normal file
View file

@ -0,0 +1,318 @@
"""
tracker.py Robot Tracker v005
Kamera-Capture und AprilTag-Erkennung laufen in einem dedizierten
Hintergrund-Thread. Der asyncio Event-Loop wird nie blockiert.
Ein Watchdog-Loop startet ffmpeg bei Verbindungsabbruch automatisch neu.
"""
import asyncio
import logging
import math
import subprocess
import threading
import time
from typing import Optional
import cv2
import numpy as np
from pupil_apriltags import Detector
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def ema(a: float, b: float, alpha: float) -> float:
return a + alpha * (b - a)
def ema_angle_deg(prev_deg: float, new_deg: float, alpha: float) -> float:
"""EMA über den Einheitskreis — verhindert Sprung bei ±180°."""
px = math.cos(math.radians(prev_deg))
py = math.sin(math.radians(prev_deg))
nx = math.cos(math.radians(new_deg))
ny = math.sin(math.radians(new_deg))
return math.degrees(math.atan2(ema(py, ny, alpha), ema(px, nx, alpha)))
def wrap_angle_deg(a: float) -> float:
return (a + 180.0) % 360.0 - 180.0
# ---------------------------------------------------------------------------
# TagTracker
# ---------------------------------------------------------------------------
class TagTracker:
"""
Liest Frames von einem RTSP-Stream via ffmpeg-Subprocess,
erkennt AprilTags und pflegt einen geglätteten Zustandsvektor
pro Tag (Position, Winkel, Geschwindigkeit).
Der gesamte I/O läuft in einem eigenen Thread. Der Event-Loop
holt Ergebnisse aus einer asyncio.Queue (maxsize=1), d.h. er
bekommt immer das neueste verfügbare Ergebnis.
"""
def __init__(self, cfg: dict):
cam = cfg["camera"]
trk = cfg["tracker"]
self.rtsp_url = cam["rtsp_url"]
self.width = cam["width"]
self.height = cam["height"]
self.rtsp_transport = cam.get("rtsp_transport", "udp")
self.reconnect_delay = cam.get("reconnect_delay", 3.0)
self.max_attempts = cam.get("reconnect_max_attempts", 0)
self.frame_bytes = self.width * self.height * 3
self.alpha_pos = trk["ema_alpha_pos"]
self.alpha_ang = trk["ema_alpha_ang"]
self.alpha_vel = trk["ema_alpha_vel"]
self.hold_seconds = trk["hold_seconds"]
self.pred_seconds = trk["predict_seconds"]
self.detector = Detector(
families = trk["tag_family"],
nthreads = trk["detector_threads"],
quad_decimate = trk["quad_decimate"],
quad_sigma = trk["quad_sigma"],
refine_edges = trk["refine_edges"],
decode_sharpening= trk["decode_sharpening"],
)
self.state: dict = {}
# Queue zwischen Producer-Thread und asyncio-Consumer
# maxsize=1: der Consumer bekommt immer das neueste Frame,
# ältere werden verworfen.
self._queue: Optional[asyncio.Queue] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._ffmpeg: Optional[subprocess.Popen] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self, loop: asyncio.AbstractEventLoop) -> None:
"""Startet den Producer-Thread. Muss mit dem laufenden Event-Loop aufgerufen werden."""
self._loop = loop
self._queue = asyncio.Queue(maxsize=1)
self._stop_event.clear()
self._thread = threading.Thread(target=self._producer_loop, daemon=True, name="tracker-producer")
self._thread.start()
logger.info("TagTracker gestartet")
def stop(self) -> None:
"""Stoppt den Producer-Thread und beendet ffmpeg sauber."""
self._stop_event.set()
self._kill_ffmpeg()
if self._thread:
self._thread.join(timeout=5.0)
logger.info("TagTracker gestoppt")
# ------------------------------------------------------------------
# Async Interface für den Event-Loop
# ------------------------------------------------------------------
async def get_tags(self) -> list:
"""
Wartet auf das nächste Ergebnis aus dem Producer-Thread.
Gibt eine leere Liste zurück wenn innerhalb von 0.5 s nichts kommt
(z.B. während eines Reconnects).
"""
if self._queue is None:
return []
try:
return await asyncio.wait_for(self._queue.get(), timeout=0.5)
except asyncio.TimeoutError:
return []
# ------------------------------------------------------------------
# Producer-Thread (läuft komplett außerhalb des Event-Loops)
# ------------------------------------------------------------------
def _producer_loop(self) -> None:
"""
Watchdog-Loop: startet ffmpeg, liest Frames, erkennt Tags,
stellt Ergebnis in die Queue. Startet ffmpeg bei Fehler neu.
"""
attempts = 0
while not self._stop_event.is_set():
attempts += 1
if self.max_attempts > 0 and attempts > self.max_attempts:
logger.error("Maximale Reconnect-Versuche erreicht. Producer stoppt.")
break
logger.info(f"ffmpeg starten (Versuch {attempts})… URL: {self.rtsp_url}")
proc = self._spawn_ffmpeg()
if proc is None:
self._stop_event.wait(self.reconnect_delay)
continue
self._ffmpeg = proc
read_errors = 0
try:
while not self._stop_event.is_set():
raw = proc.stdout.read(self.frame_bytes)
if len(raw) != self.frame_bytes:
read_errors += 1
if read_errors >= 3:
logger.warning("Zu viele Frame-Lesefehler — Reconnect")
break
continue
read_errors = 0
tags = self._process_frame(raw)
self._enqueue(tags)
except Exception as e:
logger.error(f"Fehler im Producer-Thread: {e}")
finally:
self._kill_ffmpeg()
if not self._stop_event.is_set():
logger.info(f"Reconnect in {self.reconnect_delay:.1f}s…")
self._stop_event.wait(self.reconnect_delay)
def _spawn_ffmpeg(self) -> Optional[subprocess.Popen]:
cmd = [
"ffmpeg",
"-rtsp_transport", self.rtsp_transport,
"-fflags", "nobuffer",
"-flags", "low_delay",
"-probesize", "32",
"-analyzeduration","0",
"-vsync", "0",
"-i", self.rtsp_url,
"-an", "-sn", "-dn",
"-pix_fmt", "bgr24",
"-f", "rawvideo",
"-",
]
try:
return subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=self.frame_bytes,
)
except FileNotFoundError:
logger.error("ffmpeg nicht gefunden — bitte installieren")
return None
except Exception as e:
logger.error(f"ffmpeg-Start fehlgeschlagen: {e}")
return None
def _kill_ffmpeg(self) -> None:
proc = self._ffmpeg
if proc and proc.poll() is None:
try:
proc.terminate()
proc.wait(timeout=3.0)
except Exception:
proc.kill()
self._ffmpeg = None
def _enqueue(self, tags: list) -> None:
"""Stellt Tags in die Queue. Älteres Ergebnis wird verworfen wenn voll."""
if self._queue is None or self._loop is None:
return
# Aus dem Thread heraus: Thread-safe über call_soon_threadsafe
self._loop.call_soon_threadsafe(self._queue_put_nowait, tags)
def _queue_put_nowait(self, tags: list) -> None:
"""Läuft im Event-Loop-Thread. Tauscht älteres Ergebnis aus."""
if self._queue is None:
return
if self._queue.full():
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
self._queue.put_nowait(tags)
except asyncio.QueueFull:
pass
# ------------------------------------------------------------------
# Frame-Verarbeitung (läuft im Producer-Thread)
# ------------------------------------------------------------------
def _process_frame(self, raw: bytes) -> list:
frame = np.frombuffer(raw, dtype=np.uint8).reshape(
(self.height, self.width, 3)
)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detections = self.detector.detect(gray)
now = time.time()
for d in detections:
tag_id = int(d.tag_id)
cx, cy = float(d.center[0]), float(d.center[1])
p0, p1 = d.corners[0], d.corners[1]
theta_meas = float(np.degrees(
np.arctan2(p1[0] - p0[0], -(p1[1] - p0[1]))
))
if tag_id not in self.state:
self.state[tag_id] = {
"x": cx, "y": cy, "theta": theta_meas,
"vx": 0.0, "vy": 0.0, "omega": 0.0,
"last_meas": now, "last_pose_ts": now,
"lx": cx, "ly": cy, "lt": theta_meas,
}
else:
s = self.state[tag_id]
x = ema(s["x"], cx, self.alpha_pos)
y = ema(s["y"], cy, self.alpha_pos)
th = ema_angle_deg(s["theta"], theta_meas, self.alpha_ang)
dt = now - s["last_pose_ts"]
if dt > 1e-3:
s["vx"] = ema(s["vx"], (x - s["lx"]) / dt, self.alpha_vel)
s["vy"] = ema(s["vy"], (y - s["ly"]) / dt, self.alpha_vel)
s["omega"] = ema(s["omega"], wrap_angle_deg(th - s["lt"]) / dt, self.alpha_vel)
s.update({
"x": x, "y": y, "theta": th,
"last_meas": now, "last_pose_ts": now,
"lx": x, "ly": y, "lt": th,
})
out = []
to_del = []
for tag_id, s in self.state.items():
age = now - s["last_meas"]
if age <= self.hold_seconds:
if age <= self.pred_seconds:
x = s["x"] + s["vx"] * age
y = s["y"] + s["vy"] * age
th = wrap_angle_deg(s["theta"] + s["omega"] * age)
else:
x, y, th = s["x"], s["y"], s["theta"]
out.append({
"id": tag_id,
"x": round(x, 2),
"y": round(y, 2),
"theta": round(th, 2),
"age": round(age, 3),
"vx": round(s["vx"], 2),
"vy": round(s["vy"], 2),
"omega": round(s["omega"], 2),
})
else:
to_del.append(tag_id)
for tag_id in to_del:
del self.state[tag_id]
return out