426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""
|
||
main.py — ESP32 Roboter Firmware v011
|
||
Verbesserungen gegenüber v009/v010:
|
||
1. Feedforward-Regler: kS (Haftreibung) + kV (Geschwindigkeit)
|
||
→ PID regelt nur noch den kleinen Restfehler statt alles alleine
|
||
→ kein Überschwingen beim Anfahren, sofort stabile Geschwindigkeit
|
||
2. Gleitendes Messfenster für Istgeschwindigkeit (3 Ticks = 150ms)
|
||
→ bei 20 Pulsen/Umdrehung ist 50ms zu grob — quantisiertes Rauschen
|
||
→ längeres Fenster = ruhigere PID-Eingabe = weniger Zittern
|
||
3. Reduzierte PID-Gains (KP, KI, KD) weil Feedforward die Hauptarbeit macht
|
||
4. Anti-Windup: Integral einfrieren bei PWM-Sättigung
|
||
5. Getrennte kS/kV pro Rad (links/rechts oft unterschiedlich)
|
||
|
||
Tuning-Anleitung:
|
||
Schritt 1: kS_L, kS_R einstellen
|
||
→ Roboter soll bei Sollgeschwindigkeit 1 mm/s gerade noch anfahren
|
||
→ Typisch 100-180 bei TB6612/L9110S + 3.7V LiPo
|
||
Schritt 2: kV_L, kV_R einstellen
|
||
→ Roboter soll bei 160 mm/s Soll ungefähr 160 mm/s Ist fahren (ohne PID)
|
||
→ kV = (PWM_bei_160mmps - kS) / 160
|
||
→ Typisch 2.0-3.5 je nach Motor/Batterie
|
||
Schritt 3: KP klein lassen (0.3-0.8), KI sehr klein (0.05-0.15)
|
||
→ Feedforward macht 90% der Arbeit, PID nur Feinkorrektur
|
||
"""
|
||
|
||
import network
|
||
import asyncio
|
||
import json
|
||
from machine import Pin, PWM
|
||
import time
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
ROBOT_ID = 91
|
||
WIFI_SSID = "FrankeLab"
|
||
WIFI_PSW = "FrankeLab@Edu"
|
||
SERVER_IP = "192.168.0.250"
|
||
SERVER_PORT = 8765
|
||
|
||
MM_PER_PULSE = 10.525
|
||
TELEMETRY_MS = 100
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Feedforward-Parameter — HIER TUNEN
|
||
# ---------------------------------------------------------------------------
|
||
# Feedforward deaktiviert (Option B) — reiner PID wie v009
|
||
# Zum Aktivieren: KS auf ~150, KV auf ~2.5 setzen und KP/KI reduzieren
|
||
# ---------------------------------------------------------------------------
|
||
KS_L = 0.0
|
||
KS_R = 0.0
|
||
KV_L = 0.0
|
||
KV_R = 0.0
|
||
|
||
# PID-Parameter: bewährte v009-Werte
|
||
KP = 1.2
|
||
KI = 0.3
|
||
KD = 0.05
|
||
|
||
PWM_MIN = 150 # Mindestwert damit Motor anläuft (nur ohne Feedforward)
|
||
PWM_MAX = 950
|
||
|
||
# Gleitendes Messfenster: Anzahl PID-Ticks über die gemittelt wird
|
||
# 3 Ticks × 50ms = 150ms Fenster → bei 20 Pulse/Rev viel ruhigeres Signal
|
||
SPEED_WINDOW = 3
|
||
|
||
# Rampe
|
||
RAMP_UP = 40.0 # mm/s pro Tick
|
||
RAMP_DOWN = 80.0
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Motoren
|
||
# ---------------------------------------------------------------------------
|
||
ain1 = Pin(17, Pin.OUT)
|
||
ain2 = Pin(18, Pin.OUT)
|
||
pwma = PWM(Pin(8), freq=1000)
|
||
|
||
bin1 = Pin(9, Pin.OUT)
|
||
bin2 = Pin(10, Pin.OUT)
|
||
pwmb = PWM(Pin(11), freq=1000)
|
||
|
||
def motors_off():
|
||
ain1.value(0); ain2.value(0); pwma.duty(0)
|
||
bin1.value(0); bin2.value(0); pwmb.duty(0)
|
||
|
||
motors_off()
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Encoder
|
||
# ---------------------------------------------------------------------------
|
||
enc_l = 0
|
||
enc_r = 0
|
||
enc_l_total = 0
|
||
enc_r_total = 0
|
||
|
||
def isr_l(p):
|
||
global enc_l, enc_l_total
|
||
enc_l += 1
|
||
enc_l_total += 1
|
||
|
||
def isr_r(p):
|
||
global enc_r, enc_r_total
|
||
enc_r += 1
|
||
enc_r_total += 1
|
||
|
||
pin_enc_l = Pin(6, Pin.IN, Pin.PULL_UP)
|
||
pin_enc_r = Pin(7, Pin.IN, Pin.PULL_UP)
|
||
pin_enc_l.irq(trigger=Pin.IRQ_RISING, handler=isr_l)
|
||
pin_enc_r.irq(trigger=Pin.IRQ_RISING, handler=isr_r)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PID (jetzt nur noch für Restfehler)
|
||
# ---------------------------------------------------------------------------
|
||
class PID:
|
||
def __init__(self, kp, ki, kd):
|
||
self.kp = kp
|
||
self.ki = ki
|
||
self.kd = kd
|
||
self.integral = 0.0
|
||
self.last_error = 0.0
|
||
self.last_time = time.ticks_ms()
|
||
self.saturated = False # für Anti-Windup
|
||
|
||
def reset(self):
|
||
self.integral = 0.0
|
||
self.last_error = 0.0
|
||
self.last_time = time.ticks_ms()
|
||
self.saturated = False
|
||
|
||
def compute(self, target, actual, pwm_saturated=False):
|
||
now = time.ticks_ms()
|
||
dt = time.ticks_diff(now, self.last_time) / 1000.0
|
||
if dt < 0.001:
|
||
dt = 0.001
|
||
self.last_time = now
|
||
|
||
error = target - actual
|
||
|
||
# Anti-Windup: Integral nur aufbauen wenn nicht gesättigt
|
||
# oder wenn Korrektur in richtige Richtung geht
|
||
if not pwm_saturated or (error * self.integral < 0):
|
||
self.integral += error * dt
|
||
self.integral = max(-200.0, min(200.0, self.integral))
|
||
|
||
derivative = (error - self.last_error) / dt
|
||
self.last_error = error
|
||
|
||
return self.kp * error + self.ki * self.integral + self.kd * derivative
|
||
|
||
|
||
pid_l = PID(KP, KI, KD)
|
||
pid_r = PID(KP, KI, KD)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Zustandsvariablen
|
||
# ---------------------------------------------------------------------------
|
||
target_speed_l = 0.0
|
||
target_speed_r = 0.0
|
||
|
||
ramped_speed_l = 0.0
|
||
ramped_speed_r = 0.0
|
||
|
||
current_pwm_l = 0.0
|
||
current_pwm_r = 0.0
|
||
|
||
# Gleitendes Fenster für Istgeschwindigkeit
|
||
# Speichert die letzten SPEED_WINDOW Encoder-Deltas + Zeiten
|
||
_enc_hist_l = [0] * SPEED_WINDOW # Pulse-Deltas
|
||
_enc_hist_r = [0] * SPEED_WINDOW
|
||
_dt_hist = [0.05] * SPEED_WINDOW # Zeitdeltas in Sekunden
|
||
_hist_idx = 0
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rampe
|
||
# ---------------------------------------------------------------------------
|
||
def apply_ramp(ramped, target):
|
||
if target == 0.0:
|
||
if ramped > 0:
|
||
return max(0.0, ramped - RAMP_DOWN)
|
||
elif ramped < 0:
|
||
return min(0.0, ramped + RAMP_DOWN)
|
||
return 0.0
|
||
if (target > 0 and ramped >= 0) or (target < 0 and ramped <= 0):
|
||
if abs(ramped) < abs(target):
|
||
if target > 0:
|
||
return min(target, ramped + RAMP_UP)
|
||
else:
|
||
return max(target, ramped - RAMP_UP)
|
||
elif abs(ramped) > abs(target):
|
||
if target > 0:
|
||
return max(target, ramped - RAMP_DOWN)
|
||
else:
|
||
return min(target, ramped + RAMP_DOWN)
|
||
return target
|
||
else:
|
||
# Richtungswechsel: erst auf 0
|
||
if ramped > 0:
|
||
return max(0.0, ramped - RAMP_DOWN)
|
||
else:
|
||
return min(0.0, ramped + RAMP_DOWN)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Feedforward-Berechnung
|
||
# ---------------------------------------------------------------------------
|
||
def feedforward(speed_target, ks, kv):
|
||
"""
|
||
Berechnet PWM-Vorsteuerung aus Sollgeschwindigkeit.
|
||
u_ff = kS*sign(v) + kV*|v|
|
||
|
||
kS überwindet Haftreibung (Motoranlauf)
|
||
kV deckt die lineare Kennlinie ab
|
||
|
||
Ergebnis ist vorzeichenbehaftet (negativ = rückwärts).
|
||
"""
|
||
if speed_target == 0.0:
|
||
return 0.0
|
||
sign = 1.0 if speed_target > 0 else -1.0
|
||
return sign * (ks + kv * abs(speed_target))
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Motor setzen
|
||
# ---------------------------------------------------------------------------
|
||
def set_motor(a1, a2, pwm_obj, pwm_val):
|
||
pwm_val = max(-PWM_MAX, min(PWM_MAX, pwm_val))
|
||
if pwm_val > 0:
|
||
a1.value(1); a2.value(0)
|
||
pwm_obj.duty(int(pwm_val))
|
||
elif pwm_val < 0:
|
||
a1.value(0); a2.value(1)
|
||
pwm_obj.duty(int(-pwm_val))
|
||
else:
|
||
a1.value(0); a2.value(0)
|
||
pwm_obj.duty(0)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PID Loop mit Feedforward
|
||
# ---------------------------------------------------------------------------
|
||
async def pid_loop():
|
||
global enc_l, enc_r, current_pwm_l, current_pwm_r
|
||
global ramped_speed_l, ramped_speed_r
|
||
global _enc_hist_l, _enc_hist_r, _dt_hist, _hist_idx
|
||
|
||
last_enc_l = 0
|
||
last_enc_r = 0
|
||
last_time = time.ticks_ms()
|
||
|
||
while True:
|
||
await asyncio.sleep_ms(50)
|
||
|
||
now = time.ticks_ms()
|
||
dt = time.ticks_diff(now, last_time) / 1000.0
|
||
if dt < 0.001:
|
||
dt = 0.001
|
||
last_time = now
|
||
|
||
# --- Rampe ---
|
||
ramped_speed_l = apply_ramp(ramped_speed_l, target_speed_l)
|
||
ramped_speed_r = apply_ramp(ramped_speed_r, target_speed_r)
|
||
|
||
# --- Encoder-Delta messen und ins Fenster eintragen ---
|
||
delta_l = enc_l - last_enc_l
|
||
delta_r = enc_r - last_enc_r
|
||
last_enc_l = enc_l
|
||
last_enc_r = enc_r
|
||
|
||
_enc_hist_l[_hist_idx] = delta_l
|
||
_enc_hist_r[_hist_idx] = delta_r
|
||
_dt_hist[_hist_idx] = dt
|
||
_hist_idx = (_hist_idx + 1) % SPEED_WINDOW
|
||
|
||
# Istgeschwindigkeit aus Summe des Fensters
|
||
sum_pulses_l = sum(_enc_hist_l)
|
||
sum_pulses_r = sum(_enc_hist_r)
|
||
sum_dt = sum(_dt_hist)
|
||
|
||
if sum_dt > 0:
|
||
actual_l = (sum_pulses_l * MM_PER_PULSE) / sum_dt
|
||
actual_r = (sum_pulses_r * MM_PER_PULSE) / sum_dt
|
||
else:
|
||
actual_l = 0.0
|
||
actual_r = 0.0
|
||
|
||
# --- Stopp ---
|
||
if ramped_speed_l == 0.0 and ramped_speed_r == 0.0:
|
||
motors_off()
|
||
current_pwm_l = 0.0
|
||
current_pwm_r = 0.0
|
||
pid_l.reset()
|
||
pid_r.reset()
|
||
# Fenster leeren damit nach Stopp kein Phantomspeed
|
||
for i in range(SPEED_WINDOW):
|
||
_enc_hist_l[i] = 0
|
||
_enc_hist_r[i] = 0
|
||
continue
|
||
|
||
# --- Linker Motor: Feedforward + PID ---
|
||
if ramped_speed_l != 0.0:
|
||
ff_l = feedforward(ramped_speed_l, KS_L, KV_L)
|
||
sat_l = abs(current_pwm_l) >= PWM_MAX
|
||
pid_out_l = pid_l.compute(abs(ramped_speed_l), actual_l, sat_l)
|
||
sign_l = 1.0 if ramped_speed_l > 0 else -1.0
|
||
if KS_L == 0.0 and KV_L == 0.0:
|
||
# Kein Feedforward: aufaddieren wie v009 damit PWM anwächst
|
||
current_pwm_l += pid_out_l * sign_l
|
||
if sign_l > 0 and current_pwm_l < PWM_MIN:
|
||
current_pwm_l = float(PWM_MIN)
|
||
elif sign_l < 0 and current_pwm_l > -PWM_MIN:
|
||
current_pwm_l = float(-PWM_MIN)
|
||
else:
|
||
# Mit Feedforward: direkt setzen, FF macht Grundlast
|
||
current_pwm_l = ff_l + pid_out_l * sign_l
|
||
else:
|
||
current_pwm_l = 0.0
|
||
pid_l.reset()
|
||
|
||
# --- Rechter Motor: Feedforward + PID ---
|
||
if ramped_speed_r != 0.0:
|
||
ff_r = feedforward(ramped_speed_r, KS_R, KV_R)
|
||
sat_r = abs(current_pwm_r) >= PWM_MAX
|
||
pid_out_r = pid_r.compute(abs(ramped_speed_r), actual_r, sat_r)
|
||
sign_r = 1.0 if ramped_speed_r > 0 else -1.0
|
||
if KS_R == 0.0 and KV_R == 0.0:
|
||
# Kein Feedforward: aufaddieren wie v009
|
||
current_pwm_r += pid_out_r * sign_r
|
||
if sign_r > 0 and current_pwm_r < PWM_MIN:
|
||
current_pwm_r = float(PWM_MIN)
|
||
elif sign_r < 0 and current_pwm_r > -PWM_MIN:
|
||
current_pwm_r = float(-PWM_MIN)
|
||
else:
|
||
# Mit Feedforward: direkt setzen
|
||
current_pwm_r = ff_r + pid_out_r * sign_r
|
||
else:
|
||
current_pwm_r = 0.0
|
||
pid_r.reset()
|
||
|
||
set_motor(ain1, ain2, pwma, current_pwm_l)
|
||
set_motor(bin1, bin2, pwmb, current_pwm_r)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# WiFi
|
||
# ---------------------------------------------------------------------------
|
||
def wifi_connect():
|
||
wlan = network.WLAN(network.STA_IF)
|
||
wlan.active(True)
|
||
if not wlan.isconnected():
|
||
print("WiFi verbinde...")
|
||
wlan.connect(WIFI_SSID, WIFI_PSW)
|
||
for _ in range(20):
|
||
if wlan.isconnected():
|
||
break
|
||
time.sleep(0.5)
|
||
if wlan.isconnected():
|
||
print("WiFi OK IP=" + wlan.ifconfig()[0])
|
||
return True
|
||
print("WiFi fehlgeschlagen")
|
||
return False
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# TCP
|
||
# ---------------------------------------------------------------------------
|
||
async def robot_loop():
|
||
global target_speed_l, target_speed_r
|
||
|
||
if not wifi_connect():
|
||
return
|
||
|
||
while True:
|
||
try:
|
||
print("Verbinde mit Server...")
|
||
reader, writer = await asyncio.open_connection(SERVER_IP, SERVER_PORT)
|
||
print("Verbunden!")
|
||
|
||
hello = json.dumps({"robot_id": ROBOT_ID, "version": "v011"}) + "\n"
|
||
writer.write(hello.encode())
|
||
await writer.drain()
|
||
|
||
last_telem = time.ticks_ms()
|
||
|
||
while True:
|
||
now = time.ticks_ms()
|
||
|
||
if time.ticks_diff(now, last_telem) >= TELEMETRY_MS:
|
||
telem = json.dumps({
|
||
"robot_id": ROBOT_ID,
|
||
"enc_l": enc_l_total,
|
||
"enc_r": enc_r_total,
|
||
"speed_l": round(ramped_speed_l, 1),
|
||
"speed_r": round(ramped_speed_r, 1),
|
||
"pwm_l": int(current_pwm_l),
|
||
"pwm_r": int(current_pwm_r),
|
||
}) + "\n"
|
||
writer.write(telem.encode())
|
||
await writer.drain()
|
||
last_telem = now
|
||
|
||
try:
|
||
line = await asyncio.wait_for(reader.readline(), timeout=0.05)
|
||
if line:
|
||
cmd = json.loads(line.decode().strip())
|
||
if "speed_l" in cmd:
|
||
target_speed_l = float(cmd["speed_l"])
|
||
target_speed_r = float(cmd["speed_r"])
|
||
elif "left" in cmd:
|
||
target_speed_l = float(cmd["left"])
|
||
target_speed_r = float(cmd["right"])
|
||
except asyncio.TimeoutError:
|
||
pass
|
||
|
||
except Exception as e:
|
||
print("Fehler: " + str(e))
|
||
target_speed_l = 0.0
|
||
target_speed_r = 0.0
|
||
motors_off()
|
||
pid_l.reset()
|
||
pid_r.reset()
|
||
time.sleep(2)
|
||
print("Reconnect...")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
async def main():
|
||
asyncio.create_task(pid_loop())
|
||
await robot_loop()
|
||
|
||
asyncio.run(main())
|