HoloDeck_Robot_System/main.py

426 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
main.py — ESP32 Roboter Firmware v011
Verbesserungen gegenüber v009/v010:
1. Feedforward-Regler: kS (Haftreibung) + kV (Geschwindigkeit)
→ PID regelt nur noch den kleinen Restfehler statt alles alleine
→ kein Überschwingen beim Anfahren, sofort stabile Geschwindigkeit
2. Gleitendes Messfenster für Istgeschwindigkeit (3 Ticks = 150ms)
→ bei 20 Pulsen/Umdrehung ist 50ms zu grob — quantisiertes Rauschen
→ längeres Fenster = ruhigere PID-Eingabe = weniger Zittern
3. Reduzierte PID-Gains (KP, KI, KD) weil Feedforward die Hauptarbeit macht
4. Anti-Windup: Integral einfrieren bei PWM-Sättigung
5. Getrennte kS/kV pro Rad (links/rechts oft unterschiedlich)
Tuning-Anleitung:
Schritt 1: kS_L, kS_R einstellen
→ Roboter soll bei Sollgeschwindigkeit 1 mm/s gerade noch anfahren
→ Typisch 100-180 bei TB6612/L9110S + 3.7V LiPo
Schritt 2: kV_L, kV_R einstellen
→ Roboter soll bei 160 mm/s Soll ungefähr 160 mm/s Ist fahren (ohne PID)
→ kV = (PWM_bei_160mmps - kS) / 160
→ Typisch 2.0-3.5 je nach Motor/Batterie
Schritt 3: KP klein lassen (0.3-0.8), KI sehr klein (0.05-0.15)
→ Feedforward macht 90% der Arbeit, PID nur Feinkorrektur
"""
import network
import asyncio
import json
from machine import Pin, PWM
import time
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ROBOT_ID = 91
WIFI_SSID = "FrankeLab"
WIFI_PSW = "FrankeLab@Edu"
SERVER_IP = "192.168.0.250"
SERVER_PORT = 8765
MM_PER_PULSE = 10.525
TELEMETRY_MS = 100
# ---------------------------------------------------------------------------
# Feedforward-Parameter — HIER TUNEN
# ---------------------------------------------------------------------------
# Feedforward deaktiviert (Option B) — reiner PID wie v009
# Zum Aktivieren: KS auf ~150, KV auf ~2.5 setzen und KP/KI reduzieren
# ---------------------------------------------------------------------------
KS_L = 0.0
KS_R = 0.0
KV_L = 0.0
KV_R = 0.0
# PID-Parameter: bewährte v009-Werte
KP = 1.2
KI = 0.3
KD = 0.05
PWM_MIN = 150 # Mindestwert damit Motor anläuft (nur ohne Feedforward)
PWM_MAX = 950
# Gleitendes Messfenster: Anzahl PID-Ticks über die gemittelt wird
# 3 Ticks × 50ms = 150ms Fenster → bei 20 Pulse/Rev viel ruhigeres Signal
SPEED_WINDOW = 3
# Rampe
RAMP_UP = 40.0 # mm/s pro Tick
RAMP_DOWN = 80.0
# ---------------------------------------------------------------------------
# Motoren
# ---------------------------------------------------------------------------
ain1 = Pin(17, Pin.OUT)
ain2 = Pin(18, Pin.OUT)
pwma = PWM(Pin(8), freq=1000)
bin1 = Pin(9, Pin.OUT)
bin2 = Pin(10, Pin.OUT)
pwmb = PWM(Pin(11), freq=1000)
def motors_off():
ain1.value(0); ain2.value(0); pwma.duty(0)
bin1.value(0); bin2.value(0); pwmb.duty(0)
motors_off()
# ---------------------------------------------------------------------------
# Encoder
# ---------------------------------------------------------------------------
enc_l = 0
enc_r = 0
enc_l_total = 0
enc_r_total = 0
def isr_l(p):
global enc_l, enc_l_total
enc_l += 1
enc_l_total += 1
def isr_r(p):
global enc_r, enc_r_total
enc_r += 1
enc_r_total += 1
pin_enc_l = Pin(6, Pin.IN, Pin.PULL_UP)
pin_enc_r = Pin(7, Pin.IN, Pin.PULL_UP)
pin_enc_l.irq(trigger=Pin.IRQ_RISING, handler=isr_l)
pin_enc_r.irq(trigger=Pin.IRQ_RISING, handler=isr_r)
# ---------------------------------------------------------------------------
# PID (jetzt nur noch für Restfehler)
# ---------------------------------------------------------------------------
class PID:
def __init__(self, kp, ki, kd):
self.kp = kp
self.ki = ki
self.kd = kd
self.integral = 0.0
self.last_error = 0.0
self.last_time = time.ticks_ms()
self.saturated = False # für Anti-Windup
def reset(self):
self.integral = 0.0
self.last_error = 0.0
self.last_time = time.ticks_ms()
self.saturated = False
def compute(self, target, actual, pwm_saturated=False):
now = time.ticks_ms()
dt = time.ticks_diff(now, self.last_time) / 1000.0
if dt < 0.001:
dt = 0.001
self.last_time = now
error = target - actual
# Anti-Windup: Integral nur aufbauen wenn nicht gesättigt
# oder wenn Korrektur in richtige Richtung geht
if not pwm_saturated or (error * self.integral < 0):
self.integral += error * dt
self.integral = max(-200.0, min(200.0, self.integral))
derivative = (error - self.last_error) / dt
self.last_error = error
return self.kp * error + self.ki * self.integral + self.kd * derivative
pid_l = PID(KP, KI, KD)
pid_r = PID(KP, KI, KD)
# ---------------------------------------------------------------------------
# Zustandsvariablen
# ---------------------------------------------------------------------------
target_speed_l = 0.0
target_speed_r = 0.0
ramped_speed_l = 0.0
ramped_speed_r = 0.0
current_pwm_l = 0.0
current_pwm_r = 0.0
# Gleitendes Fenster für Istgeschwindigkeit
# Speichert die letzten SPEED_WINDOW Encoder-Deltas + Zeiten
_enc_hist_l = [0] * SPEED_WINDOW # Pulse-Deltas
_enc_hist_r = [0] * SPEED_WINDOW
_dt_hist = [0.05] * SPEED_WINDOW # Zeitdeltas in Sekunden
_hist_idx = 0
# ---------------------------------------------------------------------------
# Rampe
# ---------------------------------------------------------------------------
def apply_ramp(ramped, target):
if target == 0.0:
if ramped > 0:
return max(0.0, ramped - RAMP_DOWN)
elif ramped < 0:
return min(0.0, ramped + RAMP_DOWN)
return 0.0
if (target > 0 and ramped >= 0) or (target < 0 and ramped <= 0):
if abs(ramped) < abs(target):
if target > 0:
return min(target, ramped + RAMP_UP)
else:
return max(target, ramped - RAMP_UP)
elif abs(ramped) > abs(target):
if target > 0:
return max(target, ramped - RAMP_DOWN)
else:
return min(target, ramped + RAMP_DOWN)
return target
else:
# Richtungswechsel: erst auf 0
if ramped > 0:
return max(0.0, ramped - RAMP_DOWN)
else:
return min(0.0, ramped + RAMP_DOWN)
# ---------------------------------------------------------------------------
# Feedforward-Berechnung
# ---------------------------------------------------------------------------
def feedforward(speed_target, ks, kv):
"""
Berechnet PWM-Vorsteuerung aus Sollgeschwindigkeit.
u_ff = kS*sign(v) + kV*|v|
kS überwindet Haftreibung (Motoranlauf)
kV deckt die lineare Kennlinie ab
Ergebnis ist vorzeichenbehaftet (negativ = rückwärts).
"""
if speed_target == 0.0:
return 0.0
sign = 1.0 if speed_target > 0 else -1.0
return sign * (ks + kv * abs(speed_target))
# ---------------------------------------------------------------------------
# Motor setzen
# ---------------------------------------------------------------------------
def set_motor(a1, a2, pwm_obj, pwm_val):
pwm_val = max(-PWM_MAX, min(PWM_MAX, pwm_val))
if pwm_val > 0:
a1.value(1); a2.value(0)
pwm_obj.duty(int(pwm_val))
elif pwm_val < 0:
a1.value(0); a2.value(1)
pwm_obj.duty(int(-pwm_val))
else:
a1.value(0); a2.value(0)
pwm_obj.duty(0)
# ---------------------------------------------------------------------------
# PID Loop mit Feedforward
# ---------------------------------------------------------------------------
async def pid_loop():
global enc_l, enc_r, current_pwm_l, current_pwm_r
global ramped_speed_l, ramped_speed_r
global _enc_hist_l, _enc_hist_r, _dt_hist, _hist_idx
last_enc_l = 0
last_enc_r = 0
last_time = time.ticks_ms()
while True:
await asyncio.sleep_ms(50)
now = time.ticks_ms()
dt = time.ticks_diff(now, last_time) / 1000.0
if dt < 0.001:
dt = 0.001
last_time = now
# --- Rampe ---
ramped_speed_l = apply_ramp(ramped_speed_l, target_speed_l)
ramped_speed_r = apply_ramp(ramped_speed_r, target_speed_r)
# --- Encoder-Delta messen und ins Fenster eintragen ---
delta_l = enc_l - last_enc_l
delta_r = enc_r - last_enc_r
last_enc_l = enc_l
last_enc_r = enc_r
_enc_hist_l[_hist_idx] = delta_l
_enc_hist_r[_hist_idx] = delta_r
_dt_hist[_hist_idx] = dt
_hist_idx = (_hist_idx + 1) % SPEED_WINDOW
# Istgeschwindigkeit aus Summe des Fensters
sum_pulses_l = sum(_enc_hist_l)
sum_pulses_r = sum(_enc_hist_r)
sum_dt = sum(_dt_hist)
if sum_dt > 0:
actual_l = (sum_pulses_l * MM_PER_PULSE) / sum_dt
actual_r = (sum_pulses_r * MM_PER_PULSE) / sum_dt
else:
actual_l = 0.0
actual_r = 0.0
# --- Stopp ---
if ramped_speed_l == 0.0 and ramped_speed_r == 0.0:
motors_off()
current_pwm_l = 0.0
current_pwm_r = 0.0
pid_l.reset()
pid_r.reset()
# Fenster leeren damit nach Stopp kein Phantomspeed
for i in range(SPEED_WINDOW):
_enc_hist_l[i] = 0
_enc_hist_r[i] = 0
continue
# --- Linker Motor: Feedforward + PID ---
if ramped_speed_l != 0.0:
ff_l = feedforward(ramped_speed_l, KS_L, KV_L)
sat_l = abs(current_pwm_l) >= PWM_MAX
pid_out_l = pid_l.compute(abs(ramped_speed_l), actual_l, sat_l)
sign_l = 1.0 if ramped_speed_l > 0 else -1.0
if KS_L == 0.0 and KV_L == 0.0:
# Kein Feedforward: aufaddieren wie v009 damit PWM anwächst
current_pwm_l += pid_out_l * sign_l
if sign_l > 0 and current_pwm_l < PWM_MIN:
current_pwm_l = float(PWM_MIN)
elif sign_l < 0 and current_pwm_l > -PWM_MIN:
current_pwm_l = float(-PWM_MIN)
else:
# Mit Feedforward: direkt setzen, FF macht Grundlast
current_pwm_l = ff_l + pid_out_l * sign_l
else:
current_pwm_l = 0.0
pid_l.reset()
# --- Rechter Motor: Feedforward + PID ---
if ramped_speed_r != 0.0:
ff_r = feedforward(ramped_speed_r, KS_R, KV_R)
sat_r = abs(current_pwm_r) >= PWM_MAX
pid_out_r = pid_r.compute(abs(ramped_speed_r), actual_r, sat_r)
sign_r = 1.0 if ramped_speed_r > 0 else -1.0
if KS_R == 0.0 and KV_R == 0.0:
# Kein Feedforward: aufaddieren wie v009
current_pwm_r += pid_out_r * sign_r
if sign_r > 0 and current_pwm_r < PWM_MIN:
current_pwm_r = float(PWM_MIN)
elif sign_r < 0 and current_pwm_r > -PWM_MIN:
current_pwm_r = float(-PWM_MIN)
else:
# Mit Feedforward: direkt setzen
current_pwm_r = ff_r + pid_out_r * sign_r
else:
current_pwm_r = 0.0
pid_r.reset()
set_motor(ain1, ain2, pwma, current_pwm_l)
set_motor(bin1, bin2, pwmb, current_pwm_r)
# ---------------------------------------------------------------------------
# WiFi
# ---------------------------------------------------------------------------
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("WiFi verbinde...")
wlan.connect(WIFI_SSID, WIFI_PSW)
for _ in range(20):
if wlan.isconnected():
break
time.sleep(0.5)
if wlan.isconnected():
print("WiFi OK IP=" + wlan.ifconfig()[0])
return True
print("WiFi fehlgeschlagen")
return False
# ---------------------------------------------------------------------------
# TCP
# ---------------------------------------------------------------------------
async def robot_loop():
global target_speed_l, target_speed_r
if not wifi_connect():
return
while True:
try:
print("Verbinde mit Server...")
reader, writer = await asyncio.open_connection(SERVER_IP, SERVER_PORT)
print("Verbunden!")
hello = json.dumps({"robot_id": ROBOT_ID, "version": "v011"}) + "\n"
writer.write(hello.encode())
await writer.drain()
last_telem = time.ticks_ms()
while True:
now = time.ticks_ms()
if time.ticks_diff(now, last_telem) >= TELEMETRY_MS:
telem = json.dumps({
"robot_id": ROBOT_ID,
"enc_l": enc_l_total,
"enc_r": enc_r_total,
"speed_l": round(ramped_speed_l, 1),
"speed_r": round(ramped_speed_r, 1),
"pwm_l": int(current_pwm_l),
"pwm_r": int(current_pwm_r),
}) + "\n"
writer.write(telem.encode())
await writer.drain()
last_telem = now
try:
line = await asyncio.wait_for(reader.readline(), timeout=0.05)
if line:
cmd = json.loads(line.decode().strip())
if "speed_l" in cmd:
target_speed_l = float(cmd["speed_l"])
target_speed_r = float(cmd["speed_r"])
elif "left" in cmd:
target_speed_l = float(cmd["left"])
target_speed_r = float(cmd["right"])
except asyncio.TimeoutError:
pass
except Exception as e:
print("Fehler: " + str(e))
target_speed_l = 0.0
target_speed_r = 0.0
motors_off()
pid_l.reset()
pid_r.reset()
time.sleep(2)
print("Reconnect...")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main():
asyncio.create_task(pid_loop())
await robot_loop()
asyncio.run(main())