""" main.py — ESP32 Roboter Firmware v011 Verbesserungen gegenüber v009/v010: 1. Feedforward-Regler: kS (Haftreibung) + kV (Geschwindigkeit) → PID regelt nur noch den kleinen Restfehler statt alles alleine → kein Überschwingen beim Anfahren, sofort stabile Geschwindigkeit 2. Gleitendes Messfenster für Istgeschwindigkeit (3 Ticks = 150ms) → bei 20 Pulsen/Umdrehung ist 50ms zu grob — quantisiertes Rauschen → längeres Fenster = ruhigere PID-Eingabe = weniger Zittern 3. Reduzierte PID-Gains (KP, KI, KD) weil Feedforward die Hauptarbeit macht 4. Anti-Windup: Integral einfrieren bei PWM-Sättigung 5. Getrennte kS/kV pro Rad (links/rechts oft unterschiedlich) Tuning-Anleitung: Schritt 1: kS_L, kS_R einstellen → Roboter soll bei Sollgeschwindigkeit 1 mm/s gerade noch anfahren → Typisch 100-180 bei TB6612/L9110S + 3.7V LiPo Schritt 2: kV_L, kV_R einstellen → Roboter soll bei 160 mm/s Soll ungefähr 160 mm/s Ist fahren (ohne PID) → kV = (PWM_bei_160mmps - kS) / 160 → Typisch 2.0-3.5 je nach Motor/Batterie Schritt 3: KP klein lassen (0.3-0.8), KI sehr klein (0.05-0.15) → Feedforward macht 90% der Arbeit, PID nur Feinkorrektur """ import network import asyncio import json from machine import Pin, PWM import time # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- ROBOT_ID = 91 WIFI_SSID = "FrankeLab" WIFI_PSW = "FrankeLab@Edu" SERVER_IP = "192.168.0.250" SERVER_PORT = 8765 MM_PER_PULSE = 10.525 TELEMETRY_MS = 100 # --------------------------------------------------------------------------- # Feedforward-Parameter — HIER TUNEN # --------------------------------------------------------------------------- # Feedforward deaktiviert (Option B) — reiner PID wie v009 # Zum Aktivieren: KS auf ~150, KV auf ~2.5 setzen und KP/KI reduzieren # --------------------------------------------------------------------------- KS_L = 0.0 KS_R = 0.0 KV_L = 0.0 KV_R = 0.0 # PID-Parameter: bewährte v009-Werte KP = 1.2 KI = 0.3 KD = 0.05 PWM_MIN = 150 # Mindestwert damit Motor anläuft (nur ohne Feedforward) PWM_MAX = 950 # Gleitendes Messfenster: Anzahl PID-Ticks über die gemittelt wird # 3 Ticks × 50ms = 150ms Fenster → bei 20 Pulse/Rev viel ruhigeres Signal SPEED_WINDOW = 3 # Rampe RAMP_UP = 40.0 # mm/s pro Tick RAMP_DOWN = 80.0 # --------------------------------------------------------------------------- # Motoren # --------------------------------------------------------------------------- ain1 = Pin(17, Pin.OUT) ain2 = Pin(18, Pin.OUT) pwma = PWM(Pin(8), freq=1000) bin1 = Pin(9, Pin.OUT) bin2 = Pin(10, Pin.OUT) pwmb = PWM(Pin(11), freq=1000) def motors_off(): ain1.value(0); ain2.value(0); pwma.duty(0) bin1.value(0); bin2.value(0); pwmb.duty(0) motors_off() # --------------------------------------------------------------------------- # Encoder # --------------------------------------------------------------------------- enc_l = 0 enc_r = 0 enc_l_total = 0 enc_r_total = 0 def isr_l(p): global enc_l, enc_l_total enc_l += 1 enc_l_total += 1 def isr_r(p): global enc_r, enc_r_total enc_r += 1 enc_r_total += 1 pin_enc_l = Pin(6, Pin.IN, Pin.PULL_UP) pin_enc_r = Pin(7, Pin.IN, Pin.PULL_UP) pin_enc_l.irq(trigger=Pin.IRQ_RISING, handler=isr_l) pin_enc_r.irq(trigger=Pin.IRQ_RISING, handler=isr_r) # --------------------------------------------------------------------------- # PID (jetzt nur noch für Restfehler) # --------------------------------------------------------------------------- class PID: def __init__(self, kp, ki, kd): self.kp = kp self.ki = ki self.kd = kd self.integral = 0.0 self.last_error = 0.0 self.last_time = time.ticks_ms() self.saturated = False # für Anti-Windup def reset(self): self.integral = 0.0 self.last_error = 0.0 self.last_time = time.ticks_ms() self.saturated = False def compute(self, target, actual, pwm_saturated=False): now = time.ticks_ms() dt = time.ticks_diff(now, self.last_time) / 1000.0 if dt < 0.001: dt = 0.001 self.last_time = now error = target - actual # Anti-Windup: Integral nur aufbauen wenn nicht gesättigt # oder wenn Korrektur in richtige Richtung geht if not pwm_saturated or (error * self.integral < 0): self.integral += error * dt self.integral = max(-200.0, min(200.0, self.integral)) derivative = (error - self.last_error) / dt self.last_error = error return self.kp * error + self.ki * self.integral + self.kd * derivative pid_l = PID(KP, KI, KD) pid_r = PID(KP, KI, KD) # --------------------------------------------------------------------------- # Zustandsvariablen # --------------------------------------------------------------------------- target_speed_l = 0.0 target_speed_r = 0.0 ramped_speed_l = 0.0 ramped_speed_r = 0.0 current_pwm_l = 0.0 current_pwm_r = 0.0 # Gleitendes Fenster für Istgeschwindigkeit # Speichert die letzten SPEED_WINDOW Encoder-Deltas + Zeiten _enc_hist_l = [0] * SPEED_WINDOW # Pulse-Deltas _enc_hist_r = [0] * SPEED_WINDOW _dt_hist = [0.05] * SPEED_WINDOW # Zeitdeltas in Sekunden _hist_idx = 0 # --------------------------------------------------------------------------- # Rampe # --------------------------------------------------------------------------- def apply_ramp(ramped, target): if target == 0.0: if ramped > 0: return max(0.0, ramped - RAMP_DOWN) elif ramped < 0: return min(0.0, ramped + RAMP_DOWN) return 0.0 if (target > 0 and ramped >= 0) or (target < 0 and ramped <= 0): if abs(ramped) < abs(target): if target > 0: return min(target, ramped + RAMP_UP) else: return max(target, ramped - RAMP_UP) elif abs(ramped) > abs(target): if target > 0: return max(target, ramped - RAMP_DOWN) else: return min(target, ramped + RAMP_DOWN) return target else: # Richtungswechsel: erst auf 0 if ramped > 0: return max(0.0, ramped - RAMP_DOWN) else: return min(0.0, ramped + RAMP_DOWN) # --------------------------------------------------------------------------- # Feedforward-Berechnung # --------------------------------------------------------------------------- def feedforward(speed_target, ks, kv): """ Berechnet PWM-Vorsteuerung aus Sollgeschwindigkeit. u_ff = kS*sign(v) + kV*|v| kS überwindet Haftreibung (Motoranlauf) kV deckt die lineare Kennlinie ab Ergebnis ist vorzeichenbehaftet (negativ = rückwärts). """ if speed_target == 0.0: return 0.0 sign = 1.0 if speed_target > 0 else -1.0 return sign * (ks + kv * abs(speed_target)) # --------------------------------------------------------------------------- # Motor setzen # --------------------------------------------------------------------------- def set_motor(a1, a2, pwm_obj, pwm_val): pwm_val = max(-PWM_MAX, min(PWM_MAX, pwm_val)) if pwm_val > 0: a1.value(1); a2.value(0) pwm_obj.duty(int(pwm_val)) elif pwm_val < 0: a1.value(0); a2.value(1) pwm_obj.duty(int(-pwm_val)) else: a1.value(0); a2.value(0) pwm_obj.duty(0) # --------------------------------------------------------------------------- # PID Loop mit Feedforward # --------------------------------------------------------------------------- async def pid_loop(): global enc_l, enc_r, current_pwm_l, current_pwm_r global ramped_speed_l, ramped_speed_r global _enc_hist_l, _enc_hist_r, _dt_hist, _hist_idx last_enc_l = 0 last_enc_r = 0 last_time = time.ticks_ms() while True: await asyncio.sleep_ms(50) now = time.ticks_ms() dt = time.ticks_diff(now, last_time) / 1000.0 if dt < 0.001: dt = 0.001 last_time = now # --- Rampe --- ramped_speed_l = apply_ramp(ramped_speed_l, target_speed_l) ramped_speed_r = apply_ramp(ramped_speed_r, target_speed_r) # --- Encoder-Delta messen und ins Fenster eintragen --- delta_l = enc_l - last_enc_l delta_r = enc_r - last_enc_r last_enc_l = enc_l last_enc_r = enc_r _enc_hist_l[_hist_idx] = delta_l _enc_hist_r[_hist_idx] = delta_r _dt_hist[_hist_idx] = dt _hist_idx = (_hist_idx + 1) % SPEED_WINDOW # Istgeschwindigkeit aus Summe des Fensters sum_pulses_l = sum(_enc_hist_l) sum_pulses_r = sum(_enc_hist_r) sum_dt = sum(_dt_hist) if sum_dt > 0: actual_l = (sum_pulses_l * MM_PER_PULSE) / sum_dt actual_r = (sum_pulses_r * MM_PER_PULSE) / sum_dt else: actual_l = 0.0 actual_r = 0.0 # --- Stopp --- if ramped_speed_l == 0.0 and ramped_speed_r == 0.0: motors_off() current_pwm_l = 0.0 current_pwm_r = 0.0 pid_l.reset() pid_r.reset() # Fenster leeren damit nach Stopp kein Phantomspeed for i in range(SPEED_WINDOW): _enc_hist_l[i] = 0 _enc_hist_r[i] = 0 continue # --- Linker Motor: Feedforward + PID --- if ramped_speed_l != 0.0: ff_l = feedforward(ramped_speed_l, KS_L, KV_L) sat_l = abs(current_pwm_l) >= PWM_MAX pid_out_l = pid_l.compute(abs(ramped_speed_l), actual_l, sat_l) sign_l = 1.0 if ramped_speed_l > 0 else -1.0 if KS_L == 0.0 and KV_L == 0.0: # Kein Feedforward: aufaddieren wie v009 damit PWM anwächst current_pwm_l += pid_out_l * sign_l if sign_l > 0 and current_pwm_l < PWM_MIN: current_pwm_l = float(PWM_MIN) elif sign_l < 0 and current_pwm_l > -PWM_MIN: current_pwm_l = float(-PWM_MIN) else: # Mit Feedforward: direkt setzen, FF macht Grundlast current_pwm_l = ff_l + pid_out_l * sign_l else: current_pwm_l = 0.0 pid_l.reset() # --- Rechter Motor: Feedforward + PID --- if ramped_speed_r != 0.0: ff_r = feedforward(ramped_speed_r, KS_R, KV_R) sat_r = abs(current_pwm_r) >= PWM_MAX pid_out_r = pid_r.compute(abs(ramped_speed_r), actual_r, sat_r) sign_r = 1.0 if ramped_speed_r > 0 else -1.0 if KS_R == 0.0 and KV_R == 0.0: # Kein Feedforward: aufaddieren wie v009 current_pwm_r += pid_out_r * sign_r if sign_r > 0 and current_pwm_r < PWM_MIN: current_pwm_r = float(PWM_MIN) elif sign_r < 0 and current_pwm_r > -PWM_MIN: current_pwm_r = float(-PWM_MIN) else: # Mit Feedforward: direkt setzen current_pwm_r = ff_r + pid_out_r * sign_r else: current_pwm_r = 0.0 pid_r.reset() set_motor(ain1, ain2, pwma, current_pwm_l) set_motor(bin1, bin2, pwmb, current_pwm_r) # --------------------------------------------------------------------------- # WiFi # --------------------------------------------------------------------------- def wifi_connect(): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print("WiFi verbinde...") wlan.connect(WIFI_SSID, WIFI_PSW) for _ in range(20): if wlan.isconnected(): break time.sleep(0.5) if wlan.isconnected(): print("WiFi OK IP=" + wlan.ifconfig()[0]) return True print("WiFi fehlgeschlagen") return False # --------------------------------------------------------------------------- # TCP # --------------------------------------------------------------------------- async def robot_loop(): global target_speed_l, target_speed_r if not wifi_connect(): return while True: try: print("Verbinde mit Server...") reader, writer = await asyncio.open_connection(SERVER_IP, SERVER_PORT) print("Verbunden!") hello = json.dumps({"robot_id": ROBOT_ID, "version": "v011"}) + "\n" writer.write(hello.encode()) await writer.drain() last_telem = time.ticks_ms() while True: now = time.ticks_ms() if time.ticks_diff(now, last_telem) >= TELEMETRY_MS: telem = json.dumps({ "robot_id": ROBOT_ID, "enc_l": enc_l_total, "enc_r": enc_r_total, "speed_l": round(ramped_speed_l, 1), "speed_r": round(ramped_speed_r, 1), "pwm_l": int(current_pwm_l), "pwm_r": int(current_pwm_r), }) + "\n" writer.write(telem.encode()) await writer.drain() last_telem = now try: line = await asyncio.wait_for(reader.readline(), timeout=0.05) if line: cmd = json.loads(line.decode().strip()) if "speed_l" in cmd: target_speed_l = float(cmd["speed_l"]) target_speed_r = float(cmd["speed_r"]) elif "left" in cmd: target_speed_l = float(cmd["left"]) target_speed_r = float(cmd["right"]) except asyncio.TimeoutError: pass except Exception as e: print("Fehler: " + str(e)) target_speed_l = 0.0 target_speed_r = 0.0 motors_off() pid_l.reset() pid_r.reset() time.sleep(2) print("Reconnect...") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main(): asyncio.create_task(pid_loop()) await robot_loop() asyncio.run(main())