Files
rpi-tor-snowflake-panel/bin/snowpanel-enforce.py
2025-11-15 09:07:04 +01:00

227 lines
7.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import os
import sys
import time
import subprocess
from datetime import datetime, timedelta
APP_JSON_CANDIDATES = ["/etc/snowpanel/app.json", "/var/lib/snowpanel/app.json"]
LIMITS_JSON_CANDIDATES = ["/etc/snowpanel/limits.json", "/var/lib/snowpanel/limits.json"]
STATE_DIR = "/var/lib/snowpanel"
META_JSON = os.path.join(STATE_DIR, "meta.json")
STATS_JSON = os.path.join(STATE_DIR, "stats.json")
SYSTEMCTL = "/bin/systemctl"
def pretty_bytes(n: int) -> str:
n = int(n)
units = ["B", "KB", "MB", "GB", "TB"]
i = 0
f = float(n)
while f >= 1024 and i < len(units) - 1:
f /= 1024.0
i += 1
if i == 0:
return f"{int(f)} {units[i]}"
return f"{f:.2f} {units[i]}"
def load_first_json(paths):
for p in paths:
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def save_json_atomic(path, obj):
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def period_bounds(reset_day: int):
reset_day = max(1, min(28, int(reset_day or 1)))
now = datetime.now()
year = now.year
month = now.month
if now.day < reset_day:
month -= 1
if month == 0:
month = 12
year -= 1
start = datetime(year, month, reset_day, 0, 0, 0)
end_guess = start + timedelta(days=32)
end_year, end_month = end_guess.year, end_guess.month
try:
end = datetime(end_year, end_month, reset_day, 0, 0, 0)
except ValueError:
end = datetime(end_year, end_month, 1, 0, 0, 0)
return start, end
def period_label(start: datetime, end: datetime) -> str:
return f"{start.strftime('%b %-d')}{end.strftime('%b %-d')}"
def load_limits():
cfg = load_first_json(APP_JSON_CANDIDATES)
cap_gb = 0
cap_reset_day = 1
rate_mbps = 0
if cfg:
cap_gb = int(cfg.get("cap_gb", 0) or 0)
cap_reset_day = int(cfg.get("cap_reset_day", 0) or 0)
rate_mbps = int(cfg.get("rate_mbps", 0) or 0)
lim = cfg.get("limits") or {}
if cap_gb == 0: cap_gb = int(lim.get("cap_gb", 0) or 0)
if cap_reset_day == 0: cap_reset_day = int(lim.get("cap_reset_day", 0) or 0)
if rate_mbps == 0: rate_mbps = int(lim.get("rate_mbps", 0) or 0)
if cap_gb == 0 or cap_reset_day == 0 or rate_mbps == 0:
legacy = load_first_json(LIMITS_JSON_CANDIDATES)
if legacy:
if cap_gb == 0: cap_gb = int(legacy.get("cap_gb", 0) or 0)
if cap_reset_day == 0: cap_reset_day = int(legacy.get("cap_reset_day", 1) or 1)
if rate_mbps == 0: rate_mbps = int(legacy.get("rate_mbps", 0) or 0)
cap_gb = max(0, cap_gb)
cap_reset_day = min(28, max(1, cap_reset_day or 1))
rate_mbps = max(0, rate_mbps)
return cap_gb, cap_reset_day, rate_mbps
def load_usage(period_start_ts: int, period_end_ts: int, verbose: bool = False):
usage = {
"start_ts": period_start_ts,
"period_label": "",
"rx": 0,
"tx": 0,
"total": 0,
"cap_bytes": 0,
"cap_hit": False,
}
try:
with open(META_JSON, "r", encoding="utf-8") as f:
meta = json.load(f) or {}
except Exception:
meta = {}
if meta:
meta_start = int(meta.get("start_ts") or 0)
if meta_start >= period_start_ts and meta_start < period_end_ts:
for k in ("start_ts", "period_label", "rx", "tx", "total", "cap_bytes", "cap_hit"):
if k in meta:
usage[k] = meta[k]
if verbose:
print("Using usage from meta.json")
return usage
if verbose:
print("meta.json missing/out-of-period, estimating from stats.json")
try:
with open(STATS_JSON, "r", encoding="utf-8") as f:
s = json.load(f) or {}
rows = s.get("data") or []
except Exception:
rows = []
rows = sorted(rows, key=lambda r: int(r.get("t", 0)))
prev = None
total_rx = 0
total_tx = 0
for r in rows:
t = int(r.get("t", 0))
if t < period_start_ts or t >= period_end_ts:
continue
if prev is not None:
dr = max(0, int(r.get("read", 0)) - int(prev.get("read", 0)))
dw = max(0, int(r.get("written", 0)) - int(prev.get("written", 0)))
total_rx += dr
total_tx += dw
prev = r
usage["rx"] = total_rx
usage["tx"] = total_tx
usage["total"] = total_rx + total_tx
usage["start_ts"] = period_start_ts
usage["period_label"] = period_label(datetime.fromtimestamp(period_start_ts),
datetime.fromtimestamp(period_end_ts))
usage["cap_bytes"] = 0
usage["cap_hit"] = False
return usage
def update_meta_cap_hit(cap_hit: bool, period_start_ts: int, period_end_ts: int, verbose: bool = False):
try:
meta = {}
if os.path.isfile(META_JSON):
with open(META_JSON, "r", encoding="utf-8") as f:
meta = json.load(f) or {}
meta_start = int(meta.get("start_ts") or 0)
if not (period_start_ts <= meta_start < period_end_ts):
meta["start_ts"] = period_start_ts
meta["period_label"] = period_label(datetime.fromtimestamp(period_start_ts),
datetime.fromtimestamp(period_end_ts))
meta["cap_hit"] = bool(cap_hit)
save_json_atomic(META_JSON, meta)
if verbose:
print(f"meta.json updated: cap_hit={cap_hit}")
except Exception as e:
if verbose:
print(f"meta.json update skipped: {e}")
def stop_service(service: str, verbose: bool = False, dry_run: bool = False):
cmd = [SYSTEMCTL, "stop", service]
if verbose:
print("RUN:", " ".join(cmd) if not dry_run else "(dry-run) " + " ".join(cmd))
if not dry_run:
subprocess.run(cmd, check=False)
def main():
ap = argparse.ArgumentParser(description="SnowPanel monthly cap enforcer")
ap.add_argument("-v", "--verbose", action="store_true", help="verbose output")
ap.add_argument("--dry-run", action="store_true", help="do not stop the service")
ap.add_argument("--service", default="snowflake-proxy", help="systemd service name")
args = ap.parse_args()
cap_gb, reset_day, rate_mbps = load_limits()
start_dt, end_dt = period_bounds(reset_day)
start_ts = int(start_dt.timestamp())
end_ts = int(end_dt.timestamp())
usage = load_usage(start_ts, end_ts, verbose=args.verbose)
cap_bytes = usage.get("cap_bytes") or (cap_gb * (1024 ** 3))
total = int(usage.get("total") or 0)
if args.verbose:
print(f"Limits: cap_gb={cap_gb}, reset_day={reset_day}")
print(f"Usage : total={total}B cap={cap_bytes}B start_ts={start_ts}")
if cap_bytes > 0:
pct = min(100, round(total * 100 / cap_bytes)) if cap_bytes else 0
print(f" {pretty_bytes(total)} / {pretty_bytes(cap_bytes)} ({pct}%)")
else:
print(" Unlimited cap")
if cap_bytes > 0 and total >= cap_bytes:
update_meta_cap_hit(True, start_ts, end_ts, verbose=args.verbose)
if args.verbose:
print(f"Cap reached — stopping {args.service}.")
stop_service(args.service, verbose=args.verbose, dry_run=args.dry_run)
else:
update_meta_cap_hit(False, start_ts, end_ts, verbose=args.verbose)
if args.verbose:
print("Cap not reached — no action.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(130)