Welcome

ZENDURE POWERSTATION SUPERBASE V4600 . . . . . SERVICE of ZENDURE

service : seems this is an unknown topic for the company

List of errors :

Problem 1 : after performind a half successfull software update DC charging
via the Solar panels doesn't work any longer
a day later (after receiving no answer) I tried to press the AC and the LED button at the same time
this reboots the system, after this the system behaves normal

Problem 2 : system setting is stop discharging at a value of 10%
the system continues to discharge. At 1% I stopped this test

Problem 3 : charging the system to the max. value of 100%, it disconnects the AC Output.
this is ok for a light bulb, but not so for a connected and running PC
now the system is set to 93% and stops charging at this value

Problem 4 : set the AC charge value to 1000 Watt, the system chooses to charge at 750 Watt
changing the setting shows it always charges with 250 Watt less

Problem 5 : connect 48 V DC to the XT90 input -> result : nothing
the user manual states : XT90 input 12 - 150 Volt DC

We informed Zendure about all errors - > result : nothing
except : we are sorry that you face such problems


Since we like to automate the system we choose to connect a Raspberry Pico w3
to control certain settings, but so far we are only able to read values via the MQTT protocol.
Zendure doesn't provide the command sequences to set values via an external program.
saying we should consult the official Zendure GitHub forum.

ZENDURE POWERSTATION SUPERBASE V4600

a python program to request the developer infos


import json
import requests
import paho.mqtt.client as mqtt

sSerialNumber = "VE4ABCDEFG12345"               # replace with your value
sAccount      = "bernie.nobody@x-noline.com"    # replace with your value

url = "https://app.zendure.tech/eu/developer/api/apply"
payload = {
    "snNumber": sSerialNumber,
    "account": sAccount
}
headers = {"Content-Type": "application/json"}
response = requests.post(url, data=json.dumps(payload), headers=headers)
data = response.json()
if data.get("success"):
    print(data)
    print("-------------------------------------------------------")
else:
    raise Exception(f"Failed to get credentials: {data.get('msg')}")
	

ZENDURE POWERSTATION SUPERBASE V4600

a python program to show some important values


import tkinter as tk
from tkinter import ttk, messagebox
import threading
import datetime
import csv
import json
import socket
import itertools
from typing import Any, Dict, List, Optional, Union
import os
from pathlib import Path
import time
import paho.mqtt.client as mqtt

# =========================
#  Shared utils
# =========================

def now_time_str():
    return datetime.datetime.now().strftime("%H.%M.%S")

def now_stamp_str():
    return datetime.datetime.now().strftime("%Y.%m.%d_%H.%M.%S")

# === Daily log paths ===
LOG_DIR = Path("logs")
DATA_DIR = Path("data")
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)

def daily_paths():
    """Return (csv_path, log_path) for today's date."""
    d = datetime.date.today().strftime("%Y-%m-%d")
    return (
        DATA_DIR / f"data_{d}.csv",
        LOG_DIR / f"log_{d}.txt",
    )
    
# =========================
#  --- ZENDURE (MQTT) ---
# =========================

MQTT_HOST = "mqtt-eu.zen-iot.com"
MQTT_PORT = 1883
USERNAME  = "XYZw123b"                             # replace with your value
PASSWORD  = "E5EA123456789BBFAB3ABD123CC12345"     # replace with your value
TOPIC_SUB = "XYZw123b/F0J126nu/state"              # replace with your value


# =========================

class ZendureFrame(ttk.LabelFrame):
    """V4600 monitor with CSV logging; thread-safe updates via root.after."""
    def __init__(self, master: tk.Misc):
        super().__init__(master, text="Zendure V4600 Monitor")
        for c in range(4):
            self.grid_columnconfigure(c, weight=1)

        self.labels_value: List[ttk.Label] = []
        self.labels_unit: List[ttk.Label] = []
        self.labels_time: List[ttk.Label] = []

        texts1 = [
            "Input Power", "DC Input Power", "Output Power", "AC Output Power",
            "AC Input Voltage", "AC Output Voltage", "Remaining In Time",
            "Remaining Out Time", "Electric Level", "Fan State", "Input Mode",
            "Date Time"
        ]
        texts_unit = [
            "Watt", "Watt", "Watt", "Watt",
            "Volt", "Volt", "Minutes", "Minutes",
            "%", "", "", ""
        ]

        for i, name in enumerate(texts1):
            ttk.Label(self, text=name, width=22).grid(row=i, column=0, sticky="w", padx=(8,6), pady=2)
            v = ttk.Label(self, text="-", width=14, relief="groove", anchor="w")
            v.grid(row=i, column=1, sticky="we", padx=(0,6), pady=2)
            u = ttk.Label(self, text=texts_unit[i], width=10, anchor="w")
            u.grid(row=i, column=2, sticky="w", padx=(0,6), pady=2)
            t = ttk.Label(self, text="-", width=14, anchor="w")
            t.grid(row=i, column=3, sticky="w", padx=(0,6), pady=2)
            self.labels_value.append(v)
            self.labels_unit.append(u)
            self.labels_time.append(t)

        # extras
        self.rem_in_hours = ttk.Label(self, text="-", width=18, relief="groove", anchor="center")
        self.rem_out_hours = ttk.Label(self, text="-", width=18, relief="groove", anchor="center")
        self.kwh_estimate = ttk.Label(self, text="-", width=54, relief="groove", anchor="w")
        self.rem_in_hours.grid(row=6, column=3, sticky="we", padx=(0,6), pady=2)
        self.rem_out_hours.grid(row=7, column=3, sticky="we", padx=(0,6), pady=2)
        ttk.Label(self, text="Energy Est.:").grid(row=12, column=0, sticky="w", padx=(8,6), pady=(6,2))
        self.kwh_estimate.grid(row=12, column=1, columnspan=3, sticky="we", padx=(0,6), pady=(6,2))

        # status line
        self.status_label = ttk.Label(self, text="MQTT: waiting to start…", relief="sunken", anchor="w")
        self.status_label.grid(row=98, column=0, columnspan=4, sticky="we", padx=6, pady=(6,0))

        # state
        self._last_output_power_w = 0
        self._mqtt = None
        self._mapping = {
            "inputPower": 0, "dcInputPower": 1, "outputPower": 2, "acOutputPower": 3,
            "acInputVoltage": 4, "acOutputVoltage": 5, "remainInputTime": 6,
            "remainOutTime": 7, "electricLevel": 8, "electricFanState": 9, "dcInputMode": 10
        }

    def start(self, root: tk.Tk):
        """Create and start MQTT client non-blocking."""
        try:
            self.status_label.config(text="MQTT: connecting…")
            self._mqtt = mqtt.Client()
            self._mqtt.username_pw_set(USERNAME, PASSWORD)
            self._mqtt.on_connect = lambda c, u, f, rc: self._on_connect(root, rc)
            self._mqtt.on_message = lambda c, u, msg: self._on_message(root, msg)
            # Non-blocking connect so the Tk window shows immediately
            self._mqtt.connect_async(MQTT_HOST, MQTT_PORT, 60)
            self._mqtt.loop_start()
        except Exception as e:
            self.status_label.config(text=f"MQTT error: {e}")

    def stop(self):
        try:
            if self._mqtt:
                self._mqtt.loop_stop()
                self._mqtt.disconnect()
        except Exception:
            pass

    def _on_connect(self, root: tk.Tk, rc: int):
        try:
            self._mqtt.subscribe(TOPIC_SUB)
            root.after(0, lambda: self.status_label.config(text="MQTT: connected, subscribed"))
            root.after(0, lambda: self._set_status_line(11, now_stamp_str()))
        except Exception as e:
            root.after(0, lambda: self.status_label.config(text=f"MQTT subscribe error: {e}"))

    def _on_message(self, root: tk.Tk, msg):
        message = msg.payload.decode()
        try:
            _, log_path = daily_paths()
            with open(log_path, "a", encoding="utf-8") as f:
                f.write(f"{now_stamp_str()}  {message}\n")
        except Exception:
            pass
        root.after(0, lambda: self.status_label.config(text="MQTT: message received"))
        self._parse_and_update(root, message)

    def _set_value(self, idx: int, value: str):
        if 0 <= idx < len(self.labels_value):
            self.labels_value[idx].config(text=value)

    def _set_time(self, idx: int, when: str):
        if 0 <= idx < len(self.labels_time):
            self.labels_time[idx].config(text=when)

    def _set_status_line(self, idx: int, text: str):
        self._set_value(idx, text)

    def _save_to_csv(self, key: str, value: str):
        try:
            csv_path, _ = daily_paths()
            new_file = not csv_path.exists()
            with open(csv_path, mode="a", newline="", encoding="utf-8") as file:
                writer = csv.writer(file)
                if new_file:
                    writer.writerow(["timestamp", "key", "value"])
                writer.writerow([now_stamp_str(), key, value])
        except Exception:
            pass

    def _parse_and_update(self, root: tk.Tk, message: str):
        cleaned = message.replace('"', '').replace('{', '').replace('}', '')
        kv_pairs = []
        for item in cleaned.split(","):
            if ":" in item:
                k, v = item.split(":", 1)
                kv_pairs.append((k.strip(), v.strip()))

        def apply_updates():
            for key, value in kv_pairs:
                self._save_to_csv(key, value)
                if key in self._mapping:
                    idx = self._mapping[key]
                    self._set_value(idx, value)
                    self._set_time(idx, now_time_str())
                try:
                    if key in ("outputPower", "acOutputPower"):
                        self._last_output_power_w = int(float(value))
                    if key == "remainInputTime":
                        hrs = max(0, int(int(value) / 60))
                        self.rem_in_hours.config(text=f"{hrs} Hours")
                    if key == "remainOutTime":
                        hrs = max(0, int(int(value) / 60))
                        self.rem_out_hours.config(text=f"{hrs} Hours")
                        ro_hours = int(value) / 60.0
                        wh = self._last_output_power_w * ro_hours
                        kwh_wh = int(wh)
                        pct = int((100/4600.0) * wh) if wh >= 0 else 0
                        self.kwh_estimate.config(text=f"{kwh_wh} Wh   ~ {pct} % of 4.6 kWh")
                except Exception:
                    pass
            self._set_status_line(11, now_stamp_str())

        root.after(0, apply_updates)

# =========================
#  --- PICO (JSON-RPC) ---
# =========================

class JSONRPCError(Exception):
    def __init__(self, code: int, message: str, data: Any = None):
        super().__init__(f"JSON-RPC Error {code}: {message}")
        self.code = code
        self.message = message
        self.data = data

class JSONRPCClient:
    def __init__(self, host: str = "192.168.178.192", port: int = 8080, timeout: float = 5.0):
        self.host    = host
        self.port    = port
        self.timeout = timeout
        self._ids    = itertools.count(1)

    def call(self, method: str, params: Optional[Union[List[Any], Dict[str, Any]]] = None,
             request_id: Optional[Union[int, str]] = None) -> Any:
        if request_id is None:
            request_id = next(self._ids)
        req = {"jsonrpc": "2.0", "method": method, "params": params if params is not None else [], "id": request_id}
        raw = json.dumps(req)
        resp = self._send_and_recv(raw)
        if not isinstance(resp, dict):
            raise RuntimeError(f"Unexpected response type: {type(resp)} - {resp!r}")
        if "error" in resp and resp["error"] is not None:
            err = resp["error"]
            raise JSONRPCError(err.get("code", -32000), err.get("message", "Unknown error"), err.get("data"))
        return resp.get("result")

    def _send_and_recv(self, payload: str) -> Any:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(self.timeout)
            s.connect((self.host, self.port))
            s.sendall(payload.encode("utf-8"))
            chunks = []
            while True:
                try:
                    chunk = s.recv(4096)
                except socket.timeout:
                    break
                if not chunk:
                    break
                chunks.append(chunk)
        data = b"".join(chunks).decode("utf-8", errors="replace").strip()
        if not data:
            raise RuntimeError("Empty response from server")
        try:
            return json.loads(data)
        except json.JSONDecodeError as e:
            raise RuntimeError(f"Invalid JSON in response: {e}\nRaw: {data!r}") from e

    # convenience
    def gettemp(self)                   -> Dict[str, Any]: return self.call("gettemp")
    def gethumidity(self)               -> Dict[str, Any]: return self.call("gethumidity")
    def converter(self, state: int = 0) -> Dict[str, Any]: return self.call("converter", [state])
    def voltages(self)                  -> Dict[str, Any]: return self.call("voltages")
    def acinput(self, state: int = 0)   -> Dict[str, Any]: return self.call("acinput", [state])
    def acoutput(self, state: int = 0)  -> Dict[str, Any]: return self.call("acoutput", [state])
    def get_time(self)                  -> Dict[str, Any]: return self.call("get_time")

POLL_MS = 2000
omin = 0

# using a raspberry pico, we check internal temperature, room temperature, voltages of the solar panels
# -----------------------------------------------------------------------------------------------------

class PicoFrame(ttk.LabelFrame):
    def __init__(self, master: tk.Misc, client: JSONRPCClient):
        super().__init__(master, text="Pico Control Panel")
        self.client = client
        for c in range(3): self.grid_columnconfigure(c, weight=1)

        # readings
        box = ttk.LabelFrame(self, text="Readings")
        box.grid(row=0, column=0, columnspan=3, sticky="we", padx=6, pady=(4,8))
        grid = ttk.Frame(box, padding=8)
        grid.pack(fill="x")

        self.temp_i_var = tk.StringVar(value="--")
        self.temp_e_var = tk.StringVar(value="--")
        self.hum_var   = tk.StringVar(value="--")
        self.status_txt_var = tk.StringVar(value="--")
        self.volt1_var = tk.StringVar(value="--")
        self.volt2_var = tk.StringVar(value="--")
        self.volt48_var = tk.StringVar(value="--")
        self.time_var  = tk.StringVar(value="--")

        def row(r, label, var):
            ttk.Label(grid, text=label, width=18).grid(row=r, column=0, sticky="w", padx=(0,8), pady=3)
            ttk.Label(grid, textvariable=var, width=38, relief="groove", anchor="w").grid(row=r, column=1, sticky="we", pady=3)

        row(0, "Temp (internal):", self.temp_i_var)
        row(1, "Temp (external):", self.temp_e_var)
        row(2, "Humidity:", self.hum_var)
        row(3, "Status:", self.status_txt_var)
        row(4, "Panel 1 (V):", self.volt1_var)
        row(5, "Panel 2 (V):", self.volt2_var)
        row(6, "48 Volt (V):", self.volt48_var)
        row(7, "Time:", self.time_var)

        # controls
        controls = ttk.LabelFrame(self, text="Outputs")
        controls.grid(row=1, column=0, columnspan=3, sticky="we", padx=6, pady=(0,6))

        self.btn_converter = ttk.Button(controls, text="Converter: OFF", command=self.toggle_converter)
        self.btn_acinput   = ttk.Button(controls, text="AC Input: OFF", command=self.toggle_acinput)
        self.btn_acoutput  = ttk.Button(controls, text="AC Output: OFF", command=self.toggle_acoutput)

        self.btn_converter.grid(row=0, column=0, padx=8, pady=8, sticky="we")
        self.btn_acinput.grid(row=0, column=1, padx=8, pady=8, sticky="we")
        self.btn_acoutput.grid(row=0, column=2, padx=8, pady=8, sticky="we")
        for i in range(3): controls.grid_columnconfigure(i, weight=1)

        # status bar
        self.status_var = tk.StringVar(value="Pico: polling…")
        status = ttk.Label(self, textvariable=self.status_var, relief="sunken", anchor="w")
        status.grid(row=99, column=0, columnspan=3, sticky="we", padx=6, pady=(8,0))

        # polling
        self._stop = False
        self.after(250, self.poll)

    def poll(self):
        if self._stop: return
        threading.Thread(target=self._worker_poll, daemon=True).start()
        self.after(POLL_MS, self.poll)

    def _worker_poll(self):

        try:
            temp = self.client.gettemp()
            hum  = self.client.gethumidity()
            volts = self.client.voltages()
            #now  = self.client.get_time()

            def update():
                global omin
                t1 = temp.get("temperature_i")
                t2 = temp.get("temperature_e")
                self._set_num(self.temp_i_var, t1 )
                self._set_num(self.temp_e_var, t2)
                self.hum_var.set(f"{hum.get('humidity_pct', '--')} %")
                v1 = volts.get("panel1")*45
                v2 = volts.get("panel2")*43
                v3 = volts.get("48Volt")*30
                self._set_num(self.volt1_var, v1, 3)
                self._set_num(self.volt2_var, v2, 3)
                self._set_num(self.volt48_var, v3, 3)
                #self.time_var.set(now.get("local_time", "--"))
                #self.status_var.set("Pico: OK")
                nowx = datetime.datetime.now()
                print(nowx.strftime("%Y-%m-%d %H:%M:%S"))
                years   = time.localtime()[0]
                months  = time.localtime()[1]
                days    = time.localtime()[2]
                hours   = time.localtime()[3]
                minutes = time.localtime()[4]
                if (omin != minutes) and (minutes % 5 == 0):
                    omin = minutes
                    fx = open('datafile.csv','a') 
                    fx.write( f"data;{t1:.2f};{t2:.2f};{v1:.2f};{v2:.2f};{v3:.2f};{years};{months};{days};{hours};{minutes} \n")
                    fx.close()
                    print(minutes)
                print("-----")
                #fx.write(str(t1)+';'+str(t2)+';'+str(v1)+';'+str(v2)+';'+str(v3)+';end\n')
                #fx.write( f"data;{t1:.2f};{t2:.2f};{v1:.2f};{v2:.2f};{v3:.2f} \n")
            self.after(0, update)

        except JSONRPCError as e:
            self.after(0, lambda: self.status_var.set(f"Server error: {e}"))
        except Exception as e:
            #passself.after(0, lambda: self.status_var.set(f"Client error: {e}"))
            pass

    def _set_num(self, var: tk.StringVar, v, nd=2):
        if isinstance(v, (int, float)):
            var.set(f"{v:.{nd}f}")
        else:
            var.set(str(v))
			
    # the buttons are planned to switch components arround our powerstation ON or OFF
	# AC Input, another external powerinput . . . 
	
    def toggle_converter(self):
        self._call_toggle("converter", self.btn_converter,
                          lambda s: f"Converter: {'ON' if s else 'OFF'}",
                          self.client.converter)

    def toggle_acinput(self):
        self._call_toggle("acinput", self.btn_acinput,
                          lambda s: f"AC Input: {'ON' if s else 'OFF'}",
                          self.client.acinput)

    def toggle_acoutput(self):
        self._call_toggle("acoutput", self.btn_acoutput,
                          lambda s: f"AC Output: {'ON' if s else 'OFF'}",
                          self.client.acoutput)

    def _call_toggle(self, which: str, button: ttk.Button, label_fmt, rpc_func):
        new_state = 1 if "OFF" in button.cget("text") else 0
        button.state(["disabled"])
        def worker():
            try:
                rpc_func(new_state)
                def ok():
                    button.config(text=label_fmt(new_state))
                    self.status_var.set(f"{which} -> {new_state} OK")
                    button.state(["!disabled"])
                self.after(0, ok)
            except JSONRPCError as e:
                def err():
                    button.state(["!disabled"])
                    self.status_var.set(f"Server error: {e}")
                    messagebox.showerror("JSON-RPC", str(e))
                self.after(0, err)
            except Exception as e:
                def err2():
                    button.state(["!disabled"])
                    self.status_var.set(f"Client error: {e}")
                    messagebox.showerror("Client error", str(e))
                self.after(0, err2)
        threading.Thread(target=worker, daemon=True).start()

    def stop(self):
        self._stop = True

# =========================
#  --- MAIN WINDOW ---
# =========================

class MainApp:
    def __init__(self, root: tk.Tk, pico_client: JSONRPCClient):
        self.root = root
        root.title("Unified Control – Zendure V4600 + Pico")
        root.geometry("980x520")
        root.minsize(900, 480)
        # Theme fallback sequence (Windows-friendly first)
        try:
            for t in ("vista", "xpnative", "clam", "default"):
                ttk.Style().theme_use(t); break
        except Exception:
            pass

        container = ttk.Frame(root, padding=10)
        container.pack(fill="both", expand=True)
        container.grid_columnconfigure(0, weight=1)
        container.grid_columnconfigure(1, weight=1)
        container.grid_rowconfigure(0, weight=1)

        # Left: Zendure
        self.zendure = ZendureFrame(container)
        self.zendure.grid(row=0, column=0, sticky="nsew", padx=(0,8))

        # Right: Pico
        self.pico = PicoFrame(container, pico_client)
        self.pico.grid(row=0, column=1, sticky="nsew", padx=(8,0))

        # Start MQTT shortly after the window is visible (non-blocking)
        root.after(100, self.zendure.start, root)

        root.protocol("WM_DELETE_WINDOW", self.on_close)

    def on_close(self):
        try:
            self.pico.stop()
            self.zendure.stop()
        finally:
            self.root.destroy()

if __name__ == "__main__":
    # adjust host/port if your Pico server differs
    pico_client = JSONRPCClient(host="192.168.178.192", port=8080, timeout=2.0)    # replace with your value
    root = tk.Tk()
    MainApp(root, pico_client)
    root.mainloop()