Misty-Rhetorik-Coach/start_coaching.py

173 lines
5.4 KiB
Python

import websocket
import json
import requests
import time
import threading
import os
from config import MISTY_IP
from analyse import analysiere, gib_feedback, setze_status
DATEI_NAME = "aufnahme.wav"
DATEN_DATEI = "session_daten.json"
laeuft_gerade = False
aufnahme_laeuft = False
aufnahme_start = 0
def setze_neutral():
try:
requests.post(f"http://{MISTY_IP}/api/images/display",
json={"FileName": "e_DefaultContent.jpg"}, timeout=5)
except Exception:
pass
def reset_session_daten():
with open(DATEN_DATEI, "w", encoding="utf-8") as f:
json.dump({"status": "wartend", "sessions": [], "fehler": None}, f, ensure_ascii=False, indent=2)
print("--- Session-Daten zurückgesetzt ---")
def pruefe_misty_verbindung():
try:
r = requests.get(f"http://{MISTY_IP}/api/device", timeout=5)
return r.status_code == 200
except Exception:
return False
def setze_fehler(nachricht):
if os.path.exists(DATEN_DATEI):
with open(DATEN_DATEI, "r", encoding="utf-8") as f:
daten = json.load(f)
else:
daten = {"status": "fehler", "sessions": [], "fehler": nachricht}
daten["status"] = "fehler"
daten["fehler"] = nachricht
with open(DATEN_DATEI, "w", encoding="utf-8") as f:
json.dump(daten, f, ensure_ascii=False, indent=2)
def coaching_prozess():
global laeuft_gerade, aufnahme_laeuft, aufnahme_start
laeuft_gerade = True
aufnahme_laeuft = True
setze_neutral()
setze_status("aufnehmend")
print("--- Fuß gedrückt! Starte Coaching ---")
try:
requests.post(f"http://{MISTY_IP}/api/tts/speak", json={
"text": "Ich höre dir zu. Drücke meinen Fuß nochmal wenn du fertig bist.",
"voice": "de-de-x-deb-local"
}, timeout=5)
except Exception as e:
print(f"❌ Fehler bei TTS: {e}")
time.sleep(4)
print("--- Aufnahme läuft ---")
try:
requests.post(f"http://{MISTY_IP}/api/audio/record/start",
json={"FileName": DATEI_NAME}, timeout=5)
except Exception as e:
print(f"❌ Fehler beim Starten der Aufnahme: {e}")
setze_fehler(f"Aufnahme konnte nicht gestartet werden: {e}")
laeuft_gerade = False
return
aufnahme_start = time.time()
while aufnahme_laeuft:
time.sleep(0.1)
echte_dauer = time.time() - aufnahme_start
print(f"--- Aufnahme gestoppt ({round(echte_dauer)} Sekunden) ---")
try:
requests.post(f"http://{MISTY_IP}/api/audio/record/stop", timeout=5)
except Exception as e:
print(f"❌ Fehler beim Stoppen der Aufnahme: {e}")
time.sleep(2)
print("--- Übertragung der Audio-Datei ---")
try:
r = requests.get(f"http://{MISTY_IP}/api/audio?FileName={DATEI_NAME}", timeout=20)
if r.status_code == 200:
with open(DATEI_NAME, 'wb') as f:
f.write(r.content)
else:
print("Download-Fehler!")
setze_fehler("Audiodatei konnte nicht heruntergeladen werden.")
setze_status("wartend")
laeuft_gerade = False
return
except Exception as e:
print(f"❌ Fehler beim Download: {e}")
setze_fehler(f"Verbindung zu Misty verloren: {e}")
setze_status("wartend")
laeuft_gerade = False
return
anzahl, text, gefundene_woerter, tempo = analysiere(DATEI_NAME, echte_dauer)
gib_feedback(anzahl, gefundene_woerter, tempo, text)
setze_status("wartend")
laeuft_gerade = False
def on_message(ws, message):
global laeuft_gerade, aufnahme_laeuft
data = json.loads(message)
if "message" in data and "sensorId" in data["message"]:
sensor = data["message"]["sensorId"]
is_pressed = data["message"]["isContacted"]
if sensor in ["bfr", "bfl"] and is_pressed:
if not laeuft_gerade:
threading.Thread(target=coaching_prozess).start()
elif aufnahme_laeuft:
print("--- Fuß gedrückt! Aufnahme wird beendet ---")
aufnahme_laeuft = False
def on_open(ws):
print("Verbindung zu Misty hergestellt. Drücke einen FUSS zum Starten.")
reset_session_daten()
setze_neutral()
setze_status("wartend")
subscribe_msg = {
"Operation": "subscribe",
"Type": "BumpSensor",
"DebounceMs": 50,
"EventName": "BumperPress",
"ReturnProperty": None
}
ws.send(json.dumps(subscribe_msg))
def on_error(ws, error):
print(f"❌ WebSocket Fehler: {error}")
setze_fehler(f"WebSocket Fehler: {error}")
def on_close(ws, close_status_code, close_msg):
print(f"❌ Verbindung getrennt: {close_status_code} - {close_msg}")
setze_neutral()
setze_status("wartend")
# Misty Verbindung prüfen vor dem Start
print("--- Prüfe Verbindung zu Misty ---")
if not pruefe_misty_verbindung():
print(f"❌ Misty nicht erreichbar unter {MISTY_IP}")
print("Bitte IP in config.py prüfen und Misty einschalten!")
setze_fehler(f"Misty nicht erreichbar unter {MISTY_IP}. Bitte IP prüfen und Misty einschalten.")
else:
print(f"✅ Misty erreichbar unter {MISTY_IP}")
ws = websocket.WebSocketApp(
f"ws://{MISTY_IP}/pubsub",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
try:
ws.run_forever()
except KeyboardInterrupt:
print("\n--- Programm beendet ---")
setze_neutral()
setze_status("wartend")