Zendure Superbase V4600 - MQTT - PYTHON

Python - MQTT - Get_API_Key


import requests

# API-Endpunkt und Authentifizierung
BASE_URL = "http://192.168.178.111/api/v1"     # die IP Addresse deiner Superbase
API_KEY  = "VX1XXXYYYZ77711"                   # "deine Superbase - Seriennummer"

# Header für die Anfrage
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Daten abrufen
def get_superbase_data():
    try:
        response = requests.get(f"{BASE_URL}/status", headers=headers)
        response.raise_for_status()
        data = response.json()
        print("SuperBase V Daten:", data)
        return data
    except requests.exceptions.RequestException as e:
        print(f"Fehler beim Abrufen der Daten: {e}")
        return None

# Funktion aufrufen
get_superbase_data()
      


Python - MQTT - Get_Data from Superbase V4600


import tkinter as tk
import paho.mqtt.client as mqtt
import threading
import datetime
import csv

# MQTT setup
MQTT_HOST = "mqtt-eu.zen-iot.com"
MQTT_PORT = 1883
USERNAME  = "z6Xx999x"                           # your API key
PASSWORD  = "X5XX555555555XXXXX5XXX555XX55555"   # your password or secret number
TOPIC_SUB = "z6Xx999x/F0J126nu/state"            # dies ist die Frage, die ich der Maschine stelle

# experimentell
TOPIC_READ_PROP = "z6Xx999x/F0J126nu/properties/read"
TOPIC_PUB = "z6Xx999x/F0J126nu/state"

# Global widgets
labels1, labels2, labels3, labels4 = [], [], [], []
lbl1, lbl2, lbl3, entry = None, None, None, None

def create_label(root, text, font, bg, width, x, y):
    label = tk.Label(root, text=text, font=font, bg=bg, width=width)
    label.place(x=x, y=y)
    return label

def setup_gui():
    global lbl1, lbl2, lbl3, entry
    root = tk.Tk()
    root.title("Zendure V4600 Monitor")
    root.geometry("680x700")

    positions = [5, 230, 330, 440, 560]
    label_height, y_padding = 30, 10

    # Create label columns
    for i in range(12):
        y = i * (label_height + y_padding)
        labels1.append(create_label(root, "-", ("Consolas", 14), "lightgray", 20, positions[0], y))
        labels2.append(create_label(root, "-", ("Consolas", 14), "lightblue", 8, positions[1], y))
        if i < 11:
            labels3.append(create_label(root, "-", ("Consolas", 14), "lightgray", 10, positions[2], y))
            labels4.append(create_label(root, "-", ("Consolas", 14), "yellow", 10, positions[4], y))

    lbl1 = create_label(root, "-", ("Consolas", 14), "lightgreen", 10, positions[3], 240)
    lbl2 = create_label(root, "-", ("Consolas", 14), "lightgreen", 10, positions[3], 280)
    lbl3 = create_label(root, "-", ("Consolas", 14), "lightblue", 50, 100, 600)

    entry = tk.Entry(root, font=("Consolas", 14), width=40)
    entry.place(x=150, y=530)

    tk.Button(root, text=" Send Command ", command=send_command).place(x=10, y=530)
    tk.Button(root, text=" Request All Data ", command=request_all).place(x=10, y=500)

    # Label descriptions
    texts1 = [
        "Input Power", "DC Input Power", "Output Power", "AC Output Power",
        "AC Input Voltage", "AC Output Voltage", "remaining In Time", "remaining Out Time",
        "electric Level", "Fan State", "Input Mode", "Date Time"
    ]
    texts3 = ["Watt", "Watt", "Watt", "Watt", "Volt", "Volt", "Minutes", "Minutes", "%"]

    for i, text in enumerate(texts1): labels1[i].config(text=text)
    for i, text in enumerate(texts3): labels3[i].config(text=text)

    return root

def save_to_csv(timestamp, key, value):
    with open("data_log.csv", mode="a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow([timestamp, key, value])

def on_connect(client, userdata, flags, rc):
    client.subscribe(TOPIC_SUB)
    log(f"Connected to {MQTT_HOST}, subscribed to {TOPIC_SUB}")

def on_message(client, userdata, msg):
    timestamp = datetime.datetime.now().strftime("%Y.%m.%d_%H.%M.%S")
    message = msg.payload.decode()
    with open("logfile.txt", "a") as f:
        f.write(f"{message}   {timestamp}\n")
    log(message)

op = 0

def log(message):
    global op
    print(message)
    now = datetime.datetime.now()
    time_str = now.strftime("%H.%M.%S")
    date_str = now.strftime("%Y.%m.%d_%H.%M.%S")

    cleaned = message.replace('"', '').replace('{', '').replace('}', '')
    for item in cleaned.split(","):
        if ":" not in item:
            continue  # Skip malformed entries

        try:
            key, value = item.split(":", 1)
            key = key.strip()
            value = value.strip()
            save_to_csv(date_str, key, value)

            mapping = {
                "inputPower": 0, "dcInputPower": 1, "outputPower": 2, "acOutputPower": 3,
                "acInputVoltage": 4, "acOutputVoltage": 5, "remainInputTime": 6,
                "remainOutTime": 7, "electricLevel": 8, "electricFanState": 9, "dcInputMode": 10
            }

            if key in mapping:
                idx = mapping[key]
                if idx < len(labels2) and idx < len(labels4):
                    labels2[idx].config(text=value)
                    labels4[idx].config(text=time_str)

                if key == "outputPower":
                    op = int(value)
                if key == "acOutputPower":
                    op = int(value)
                if key == "remainInputTime":
                    lbl1.config(text=f"{int(int(value)/60)} Hours")
                if key == "remainOutTime":
                    lbl2.config(text=f"{int(int(value)/60)} Hours")
                    ro = int(value)/60
                    lbl3.config(text= str(int(op*ro)) + " kwh   " + str(int((100/4600)*(op*ro))) + " %")
                
        except Exception as e:
            print(f"⚠️ Error parsing item '{item}': {e}")

    labels2[11].config(text=date_str, width=43)

def request_all():
    client.publish(TOPIC_READ_PROP, '{"properties": ["getAll"]}')
    log("Requested: getAll")

def send_command():
    cmd = entry.get()
    if cmd:
        client.publish(TOPIC_PUB, cmd)
        log(f"Sent command: {cmd}")
        entry.delete(0, tk.END)

def start_mqtt():
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(MQTT_HOST, MQTT_PORT, 60)
    client.loop_start()

# Start everything
client = mqtt.Client()
root = setup_gui()
threading.Thread(target=start_mqtt, daemon=True).start()
root.mainloop()
      

Das alles geht sicher eleganter, schöner, schneller. Aber dies war für meine Test-Zwecke bisher gut ausreichend.
Wenn alles zur Zufriedenheit läuft, mache ich hier und da vielleicht noch ein paar Änderungen.
Wenn Zendure mir ein paar nähere Details liefern sollte, werden die auch noch eingebaut.
Bis dahin, viel Spass mit dem was wir haben.