ZENDURE POWERSTATION SUPERBASE V4600
a python program to show some important values
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import datetime
import csv
import json
import socket
import itertools
from typing import Any, Dict, List, Optional, Union
import os
from pathlib import Path
import time
import paho.mqtt.client as mqtt
# =========================
# Shared utils
# =========================
def now_time_str():
return datetime.datetime.now().strftime("%H.%M.%S")
def now_stamp_str():
return datetime.datetime.now().strftime("%Y.%m.%d_%H.%M.%S")
# === Daily log paths ===
LOG_DIR = Path("logs")
DATA_DIR = Path("data")
LOG_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
def daily_paths():
"""Return (csv_path, log_path) for today's date."""
d = datetime.date.today().strftime("%Y-%m-%d")
return (
DATA_DIR / f"data_{d}.csv",
LOG_DIR / f"log_{d}.txt",
)
# =========================
# --- ZENDURE (MQTT) ---
# =========================
MQTT_HOST = "mqtt-eu.zen-iot.com"
MQTT_PORT = 1883
USERNAME = "XYZw123b" # replace with your value
PASSWORD = "E5EA123456789BBFAB3ABD123CC12345" # replace with your value
TOPIC_SUB = "XYZw123b/F0J126nu/state" # replace with your value
# =========================
class ZendureFrame(ttk.LabelFrame):
"""V4600 monitor with CSV logging; thread-safe updates via root.after."""
def __init__(self, master: tk.Misc):
super().__init__(master, text="Zendure V4600 Monitor")
for c in range(4):
self.grid_columnconfigure(c, weight=1)
self.labels_value: List[ttk.Label] = []
self.labels_unit: List[ttk.Label] = []
self.labels_time: List[ttk.Label] = []
texts1 = [
"Input Power", "DC Input Power", "Output Power", "AC Output Power",
"AC Input Voltage", "AC Output Voltage", "Remaining In Time",
"Remaining Out Time", "Electric Level", "Fan State", "Input Mode",
"Date Time"
]
texts_unit = [
"Watt", "Watt", "Watt", "Watt",
"Volt", "Volt", "Minutes", "Minutes",
"%", "", "", ""
]
for i, name in enumerate(texts1):
ttk.Label(self, text=name, width=22).grid(row=i, column=0, sticky="w", padx=(8,6), pady=2)
v = ttk.Label(self, text="-", width=14, relief="groove", anchor="w")
v.grid(row=i, column=1, sticky="we", padx=(0,6), pady=2)
u = ttk.Label(self, text=texts_unit[i], width=10, anchor="w")
u.grid(row=i, column=2, sticky="w", padx=(0,6), pady=2)
t = ttk.Label(self, text="-", width=14, anchor="w")
t.grid(row=i, column=3, sticky="w", padx=(0,6), pady=2)
self.labels_value.append(v)
self.labels_unit.append(u)
self.labels_time.append(t)
# extras
self.rem_in_hours = ttk.Label(self, text="-", width=18, relief="groove", anchor="center")
self.rem_out_hours = ttk.Label(self, text="-", width=18, relief="groove", anchor="center")
self.kwh_estimate = ttk.Label(self, text="-", width=54, relief="groove", anchor="w")
self.rem_in_hours.grid(row=6, column=3, sticky="we", padx=(0,6), pady=2)
self.rem_out_hours.grid(row=7, column=3, sticky="we", padx=(0,6), pady=2)
ttk.Label(self, text="Energy Est.:").grid(row=12, column=0, sticky="w", padx=(8,6), pady=(6,2))
self.kwh_estimate.grid(row=12, column=1, columnspan=3, sticky="we", padx=(0,6), pady=(6,2))
# status line
self.status_label = ttk.Label(self, text="MQTT: waiting to start…", relief="sunken", anchor="w")
self.status_label.grid(row=98, column=0, columnspan=4, sticky="we", padx=6, pady=(6,0))
# state
self._last_output_power_w = 0
self._mqtt = None
self._mapping = {
"inputPower": 0, "dcInputPower": 1, "outputPower": 2, "acOutputPower": 3,
"acInputVoltage": 4, "acOutputVoltage": 5, "remainInputTime": 6,
"remainOutTime": 7, "electricLevel": 8, "electricFanState": 9, "dcInputMode": 10
}
def start(self, root: tk.Tk):
"""Create and start MQTT client non-blocking."""
try:
self.status_label.config(text="MQTT: connecting…")
self._mqtt = mqtt.Client()
self._mqtt.username_pw_set(USERNAME, PASSWORD)
self._mqtt.on_connect = lambda c, u, f, rc: self._on_connect(root, rc)
self._mqtt.on_message = lambda c, u, msg: self._on_message(root, msg)
# Non-blocking connect so the Tk window shows immediately
self._mqtt.connect_async(MQTT_HOST, MQTT_PORT, 60)
self._mqtt.loop_start()
except Exception as e:
self.status_label.config(text=f"MQTT error: {e}")
def stop(self):
try:
if self._mqtt:
self._mqtt.loop_stop()
self._mqtt.disconnect()
except Exception:
pass
def _on_connect(self, root: tk.Tk, rc: int):
try:
self._mqtt.subscribe(TOPIC_SUB)
root.after(0, lambda: self.status_label.config(text="MQTT: connected, subscribed"))
root.after(0, lambda: self._set_status_line(11, now_stamp_str()))
except Exception as e:
root.after(0, lambda: self.status_label.config(text=f"MQTT subscribe error: {e}"))
def _on_message(self, root: tk.Tk, msg):
message = msg.payload.decode()
try:
_, log_path = daily_paths()
with open(log_path, "a", encoding="utf-8") as f:
f.write(f"{now_stamp_str()} {message}\n")
except Exception:
pass
root.after(0, lambda: self.status_label.config(text="MQTT: message received"))
self._parse_and_update(root, message)
def _set_value(self, idx: int, value: str):
if 0 <= idx < len(self.labels_value):
self.labels_value[idx].config(text=value)
def _set_time(self, idx: int, when: str):
if 0 <= idx < len(self.labels_time):
self.labels_time[idx].config(text=when)
def _set_status_line(self, idx: int, text: str):
self._set_value(idx, text)
def _save_to_csv(self, key: str, value: str):
try:
csv_path, _ = daily_paths()
new_file = not csv_path.exists()
with open(csv_path, mode="a", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
if new_file:
writer.writerow(["timestamp", "key", "value"])
writer.writerow([now_stamp_str(), key, value])
except Exception:
pass
def _parse_and_update(self, root: tk.Tk, message: str):
cleaned = message.replace('"', '').replace('{', '').replace('}', '')
kv_pairs = []
for item in cleaned.split(","):
if ":" in item:
k, v = item.split(":", 1)
kv_pairs.append((k.strip(), v.strip()))
def apply_updates():
for key, value in kv_pairs:
self._save_to_csv(key, value)
if key in self._mapping:
idx = self._mapping[key]
self._set_value(idx, value)
self._set_time(idx, now_time_str())
try:
if key in ("outputPower", "acOutputPower"):
self._last_output_power_w = int(float(value))
if key == "remainInputTime":
hrs = max(0, int(int(value) / 60))
self.rem_in_hours.config(text=f"{hrs} Hours")
if key == "remainOutTime":
hrs = max(0, int(int(value) / 60))
self.rem_out_hours.config(text=f"{hrs} Hours")
ro_hours = int(value) / 60.0
wh = self._last_output_power_w * ro_hours
kwh_wh = int(wh)
pct = int((100/4600.0) * wh) if wh >= 0 else 0
self.kwh_estimate.config(text=f"{kwh_wh} Wh ~ {pct} % of 4.6 kWh")
except Exception:
pass
self._set_status_line(11, now_stamp_str())
root.after(0, apply_updates)
# =========================
# --- PICO (JSON-RPC) ---
# =========================
class JSONRPCError(Exception):
def __init__(self, code: int, message: str, data: Any = None):
super().__init__(f"JSON-RPC Error {code}: {message}")
self.code = code
self.message = message
self.data = data
class JSONRPCClient:
def __init__(self, host: str = "192.168.178.192", port: int = 8080, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._ids = itertools.count(1)
def call(self, method: str, params: Optional[Union[List[Any], Dict[str, Any]]] = None,
request_id: Optional[Union[int, str]] = None) -> Any:
if request_id is None:
request_id = next(self._ids)
req = {"jsonrpc": "2.0", "method": method, "params": params if params is not None else [], "id": request_id}
raw = json.dumps(req)
resp = self._send_and_recv(raw)
if not isinstance(resp, dict):
raise RuntimeError(f"Unexpected response type: {type(resp)} - {resp!r}")
if "error" in resp and resp["error"] is not None:
err = resp["error"]
raise JSONRPCError(err.get("code", -32000), err.get("message", "Unknown error"), err.get("data"))
return resp.get("result")
def _send_and_recv(self, payload: str) -> Any:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(self.timeout)
s.connect((self.host, self.port))
s.sendall(payload.encode("utf-8"))
chunks = []
while True:
try:
chunk = s.recv(4096)
except socket.timeout:
break
if not chunk:
break
chunks.append(chunk)
data = b"".join(chunks).decode("utf-8", errors="replace").strip()
if not data:
raise RuntimeError("Empty response from server")
try:
return json.loads(data)
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in response: {e}\nRaw: {data!r}") from e
# convenience
def gettemp(self) -> Dict[str, Any]: return self.call("gettemp")
def gethumidity(self) -> Dict[str, Any]: return self.call("gethumidity")
def converter(self, state: int = 0) -> Dict[str, Any]: return self.call("converter", [state])
def voltages(self) -> Dict[str, Any]: return self.call("voltages")
def acinput(self, state: int = 0) -> Dict[str, Any]: return self.call("acinput", [state])
def acoutput(self, state: int = 0) -> Dict[str, Any]: return self.call("acoutput", [state])
def get_time(self) -> Dict[str, Any]: return self.call("get_time")
POLL_MS = 2000
omin = 0
# using a raspberry pico, we check internal temperature, room temperature, voltages of the solar panels
# -----------------------------------------------------------------------------------------------------
class PicoFrame(ttk.LabelFrame):
def __init__(self, master: tk.Misc, client: JSONRPCClient):
super().__init__(master, text="Pico Control Panel")
self.client = client
for c in range(3): self.grid_columnconfigure(c, weight=1)
# readings
box = ttk.LabelFrame(self, text="Readings")
box.grid(row=0, column=0, columnspan=3, sticky="we", padx=6, pady=(4,8))
grid = ttk.Frame(box, padding=8)
grid.pack(fill="x")
self.temp_i_var = tk.StringVar(value="--")
self.temp_e_var = tk.StringVar(value="--")
self.hum_var = tk.StringVar(value="--")
self.status_txt_var = tk.StringVar(value="--")
self.volt1_var = tk.StringVar(value="--")
self.volt2_var = tk.StringVar(value="--")
self.volt48_var = tk.StringVar(value="--")
self.time_var = tk.StringVar(value="--")
def row(r, label, var):
ttk.Label(grid, text=label, width=18).grid(row=r, column=0, sticky="w", padx=(0,8), pady=3)
ttk.Label(grid, textvariable=var, width=38, relief="groove", anchor="w").grid(row=r, column=1, sticky="we", pady=3)
row(0, "Temp (internal):", self.temp_i_var)
row(1, "Temp (external):", self.temp_e_var)
row(2, "Humidity:", self.hum_var)
row(3, "Status:", self.status_txt_var)
row(4, "Panel 1 (V):", self.volt1_var)
row(5, "Panel 2 (V):", self.volt2_var)
row(6, "48 Volt (V):", self.volt48_var)
row(7, "Time:", self.time_var)
# controls
controls = ttk.LabelFrame(self, text="Outputs")
controls.grid(row=1, column=0, columnspan=3, sticky="we", padx=6, pady=(0,6))
self.btn_converter = ttk.Button(controls, text="Converter: OFF", command=self.toggle_converter)
self.btn_acinput = ttk.Button(controls, text="AC Input: OFF", command=self.toggle_acinput)
self.btn_acoutput = ttk.Button(controls, text="AC Output: OFF", command=self.toggle_acoutput)
self.btn_converter.grid(row=0, column=0, padx=8, pady=8, sticky="we")
self.btn_acinput.grid(row=0, column=1, padx=8, pady=8, sticky="we")
self.btn_acoutput.grid(row=0, column=2, padx=8, pady=8, sticky="we")
for i in range(3): controls.grid_columnconfigure(i, weight=1)
# status bar
self.status_var = tk.StringVar(value="Pico: polling…")
status = ttk.Label(self, textvariable=self.status_var, relief="sunken", anchor="w")
status.grid(row=99, column=0, columnspan=3, sticky="we", padx=6, pady=(8,0))
# polling
self._stop = False
self.after(250, self.poll)
def poll(self):
if self._stop: return
threading.Thread(target=self._worker_poll, daemon=True).start()
self.after(POLL_MS, self.poll)
def _worker_poll(self):
try:
temp = self.client.gettemp()
hum = self.client.gethumidity()
volts = self.client.voltages()
#now = self.client.get_time()
def update():
global omin
t1 = temp.get("temperature_i")
t2 = temp.get("temperature_e")
self._set_num(self.temp_i_var, t1 )
self._set_num(self.temp_e_var, t2)
self.hum_var.set(f"{hum.get('humidity_pct', '--')} %")
v1 = volts.get("panel1")*45
v2 = volts.get("panel2")*43
v3 = volts.get("48Volt")*30
self._set_num(self.volt1_var, v1, 3)
self._set_num(self.volt2_var, v2, 3)
self._set_num(self.volt48_var, v3, 3)
#self.time_var.set(now.get("local_time", "--"))
#self.status_var.set("Pico: OK")
nowx = datetime.datetime.now()
print(nowx.strftime("%Y-%m-%d %H:%M:%S"))
years = time.localtime()[0]
months = time.localtime()[1]
days = time.localtime()[2]
hours = time.localtime()[3]
minutes = time.localtime()[4]
if (omin != minutes) and (minutes % 5 == 0):
omin = minutes
fx = open('datafile.csv','a')
fx.write( f"data;{t1:.2f};{t2:.2f};{v1:.2f};{v2:.2f};{v3:.2f};{years};{months};{days};{hours};{minutes} \n")
fx.close()
print(minutes)
print("-----")
#fx.write(str(t1)+';'+str(t2)+';'+str(v1)+';'+str(v2)+';'+str(v3)+';end\n')
#fx.write( f"data;{t1:.2f};{t2:.2f};{v1:.2f};{v2:.2f};{v3:.2f} \n")
self.after(0, update)
except JSONRPCError as e:
self.after(0, lambda: self.status_var.set(f"Server error: {e}"))
except Exception as e:
#passself.after(0, lambda: self.status_var.set(f"Client error: {e}"))
pass
def _set_num(self, var: tk.StringVar, v, nd=2):
if isinstance(v, (int, float)):
var.set(f"{v:.{nd}f}")
else:
var.set(str(v))
# the buttons are planned to switch components arround our powerstation ON or OFF
# AC Input, another external powerinput . . .
def toggle_converter(self):
self._call_toggle("converter", self.btn_converter,
lambda s: f"Converter: {'ON' if s else 'OFF'}",
self.client.converter)
def toggle_acinput(self):
self._call_toggle("acinput", self.btn_acinput,
lambda s: f"AC Input: {'ON' if s else 'OFF'}",
self.client.acinput)
def toggle_acoutput(self):
self._call_toggle("acoutput", self.btn_acoutput,
lambda s: f"AC Output: {'ON' if s else 'OFF'}",
self.client.acoutput)
def _call_toggle(self, which: str, button: ttk.Button, label_fmt, rpc_func):
new_state = 1 if "OFF" in button.cget("text") else 0
button.state(["disabled"])
def worker():
try:
rpc_func(new_state)
def ok():
button.config(text=label_fmt(new_state))
self.status_var.set(f"{which} -> {new_state} OK")
button.state(["!disabled"])
self.after(0, ok)
except JSONRPCError as e:
def err():
button.state(["!disabled"])
self.status_var.set(f"Server error: {e}")
messagebox.showerror("JSON-RPC", str(e))
self.after(0, err)
except Exception as e:
def err2():
button.state(["!disabled"])
self.status_var.set(f"Client error: {e}")
messagebox.showerror("Client error", str(e))
self.after(0, err2)
threading.Thread(target=worker, daemon=True).start()
def stop(self):
self._stop = True
# =========================
# --- MAIN WINDOW ---
# =========================
class MainApp:
def __init__(self, root: tk.Tk, pico_client: JSONRPCClient):
self.root = root
root.title("Unified Control – Zendure V4600 + Pico")
root.geometry("980x520")
root.minsize(900, 480)
# Theme fallback sequence (Windows-friendly first)
try:
for t in ("vista", "xpnative", "clam", "default"):
ttk.Style().theme_use(t); break
except Exception:
pass
container = ttk.Frame(root, padding=10)
container.pack(fill="both", expand=True)
container.grid_columnconfigure(0, weight=1)
container.grid_columnconfigure(1, weight=1)
container.grid_rowconfigure(0, weight=1)
# Left: Zendure
self.zendure = ZendureFrame(container)
self.zendure.grid(row=0, column=0, sticky="nsew", padx=(0,8))
# Right: Pico
self.pico = PicoFrame(container, pico_client)
self.pico.grid(row=0, column=1, sticky="nsew", padx=(8,0))
# Start MQTT shortly after the window is visible (non-blocking)
root.after(100, self.zendure.start, root)
root.protocol("WM_DELETE_WINDOW", self.on_close)
def on_close(self):
try:
self.pico.stop()
self.zendure.stop()
finally:
self.root.destroy()
if __name__ == "__main__":
# adjust host/port if your Pico server differs
pico_client = JSONRPCClient(host="192.168.178.192", port=8080, timeout=2.0) # replace with your value
root = tk.Tk()
MainApp(root, pico_client)
root.mainloop()