97 lines
3.1 KiB
Python
97 lines
3.1 KiB
Python
import websocket
|
|
import json
|
|
import requests
|
|
import time
|
|
import whisper
|
|
import threading
|
|
|
|
# --- KONFIGURATION ---
|
|
MISTY_IP = "192.168.68.58"
|
|
DATEI_NAME = "bumper_aufnahme.wav"
|
|
AUFNAHME_DAUER = 10
|
|
|
|
# Flag, um zu verhindern, dass die Analyse mehrfach gleichzeitig startet
|
|
laeuft_gerade = False
|
|
|
|
def coaching_prozess():
|
|
global laeuft_gerade
|
|
laeuft_gerade = True
|
|
|
|
# 1. Start-Signal
|
|
print("--- Bumper gedrückt! Starte Coaching ---")
|
|
requests.post(f"http://{MISTY_IP}/api/tts/speak", json={
|
|
"text": "Bumper erkannt. Ich höre dir jetzt für 10 Sekunden zu.",
|
|
"speechLocale": "de-DE"
|
|
})
|
|
time.sleep(4)
|
|
|
|
# 2. Aufnahme
|
|
print(f"--- Aufnahme läuft ({AUFNAHME_DAUER}s) ---")
|
|
requests.post(f"http://{MISTY_IP}/api/audio/record/start", json={"FileName": DATEI_NAME})
|
|
time.sleep(AUFNAHME_DAUER)
|
|
requests.post(f"http://{MISTY_IP}/api/audio/record/stop")
|
|
time.sleep(2)
|
|
|
|
# 3. Download
|
|
print("--- Übertragung der Audio-Datei ---")
|
|
r = requests.get(f"http://{MISTY_IP}/api/audio?FileName={DATEI_NAME}")
|
|
if r.status_code == 200:
|
|
with open(DATEI_NAME, 'wb') as f:
|
|
f.write(r.content)
|
|
else:
|
|
print("Download-Fehler!"); laeuft_gerade = False; return
|
|
|
|
# 4. KI-Analyse
|
|
print("--- Whisper Analyse läuft ---")
|
|
model = whisper.load_model("base")
|
|
result = model.transcribe(DATEI_NAME, language="German", initial_prompt="Äh, ähm.")
|
|
text = result["text"].lower()
|
|
|
|
# Zählen
|
|
anzahl_aehm = text.count("ähm")
|
|
anzahl_aeh = text.replace("ähm", "TEMP").count("äh")
|
|
gesamt = anzahl_aeh + anzahl_aehm
|
|
|
|
# 5. Feedback
|
|
if gesamt > 0:
|
|
msg = f"Ich habe {gesamt} Füllwörter gehört. Versuche flüssiger zu sprechen."
|
|
img = "e_DisorientedConfused.jpg"
|
|
else:
|
|
msg = "Hervorragend! Keine Füllwörter gefunden."
|
|
img = "e_Joy.jpg"
|
|
|
|
requests.post(f"http://{MISTY_IP}/api/images/display", json={"FileName": img})
|
|
requests.post(f"http://{MISTY_IP}/api/tts/speak", json={"text": msg, "speechLocale": "de-DE"})
|
|
|
|
print(f"Fertig! Text: {text}")
|
|
laeuft_gerade = False
|
|
|
|
def on_message(ws, message):
|
|
global laeuft_gerade
|
|
data = json.loads(message)
|
|
|
|
# Wir prüfen, ob das Event vom Bumper kommt
|
|
if "message" in data and "sensor" in data["message"]:
|
|
sensor = data["message"]["sensor"]
|
|
is_pressed = data["message"]["isPressed"]
|
|
|
|
# "br" steht für Bumper Right (Rechter Bumper)
|
|
if sensor == "br" and is_pressed and not laeuft_gerade:
|
|
# Starte den Prozess in einem eigenen Thread, damit die Verbindung nicht blockiert
|
|
threading.Thread(target=coaching_prozess).start()
|
|
|
|
def on_open(ws):
|
|
print("Verbindung zu Misty hergestellt. Drücke den RECHTEN BUMPER zum Starten.")
|
|
# Wir abonnieren das Bumper-Event
|
|
subscribe_msg = {
|
|
"Operation": "subscribe",
|
|
"Type": "BumpSensor",
|
|
"DebounceMs": 50,
|
|
"EventName": "BumperPress",
|
|
"ReturnProperty": None
|
|
}
|
|
ws.send(json.dumps(subscribe_msg))
|
|
|
|
# WebSocket starten
|
|
ws = websocket.WebSocketApp(f"ws://{MISTY_IP}/pubsub", on_open=on_open, on_message=on_message)
|
|
ws.run_forever()
|