#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import json import os import sys import time import subprocess from datetime import datetime, timedelta APP_JSON_CANDIDATES = ["/etc/snowpanel/app.json", "/var/lib/snowpanel/app.json"] LIMITS_JSON_CANDIDATES = ["/etc/snowpanel/limits.json", "/var/lib/snowpanel/limits.json"] STATE_DIR = "/var/lib/snowpanel" META_JSON = os.path.join(STATE_DIR, "meta.json") STATS_JSON = os.path.join(STATE_DIR, "stats.json") SYSTEMCTL = "/bin/systemctl" def pretty_bytes(n: int) -> str: n = int(n) units = ["B", "KB", "MB", "GB", "TB"] i = 0 f = float(n) while f >= 1024 and i < len(units) - 1: f /= 1024.0 i += 1 if i == 0: return f"{int(f)} {units[i]}" return f"{f:.2f} {units[i]}" def load_first_json(paths): for p in paths: try: with open(p, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {} def save_json_atomic(path, obj): tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def period_bounds(reset_day: int): reset_day = max(1, min(28, int(reset_day or 1))) now = datetime.now() year = now.year month = now.month if now.day < reset_day: month -= 1 if month == 0: month = 12 year -= 1 start = datetime(year, month, reset_day, 0, 0, 0) end_guess = start + timedelta(days=32) end_year, end_month = end_guess.year, end_guess.month try: end = datetime(end_year, end_month, reset_day, 0, 0, 0) except ValueError: end = datetime(end_year, end_month, 1, 0, 0, 0) return start, end def period_label(start: datetime, end: datetime) -> str: return f"{start.strftime('%b %-d')} → {end.strftime('%b %-d')}" def load_limits(): cfg = load_first_json(APP_JSON_CANDIDATES) cap_gb = 0 cap_reset_day = 1 rate_mbps = 0 if cfg: cap_gb = int(cfg.get("cap_gb", 0) or 0) cap_reset_day = int(cfg.get("cap_reset_day", 0) or 0) rate_mbps = int(cfg.get("rate_mbps", 0) or 0) lim = cfg.get("limits") or {} if cap_gb == 0: cap_gb = int(lim.get("cap_gb", 0) or 0) if cap_reset_day == 0: cap_reset_day = int(lim.get("cap_reset_day", 0) or 0) if rate_mbps == 0: rate_mbps = int(lim.get("rate_mbps", 0) or 0) if cap_gb == 0 or cap_reset_day == 0 or rate_mbps == 0: legacy = load_first_json(LIMITS_JSON_CANDIDATES) if legacy: if cap_gb == 0: cap_gb = int(legacy.get("cap_gb", 0) or 0) if cap_reset_day == 0: cap_reset_day = int(legacy.get("cap_reset_day", 1) or 1) if rate_mbps == 0: rate_mbps = int(legacy.get("rate_mbps", 0) or 0) cap_gb = max(0, cap_gb) cap_reset_day = min(28, max(1, cap_reset_day or 1)) rate_mbps = max(0, rate_mbps) return cap_gb, cap_reset_day, rate_mbps def load_usage(period_start_ts: int, period_end_ts: int, verbose: bool = False): usage = { "start_ts": period_start_ts, "period_label": "", "rx": 0, "tx": 0, "total": 0, "cap_bytes": 0, "cap_hit": False, } try: with open(META_JSON, "r", encoding="utf-8") as f: meta = json.load(f) or {} except Exception: meta = {} if meta: meta_start = int(meta.get("start_ts") or 0) if meta_start >= period_start_ts and meta_start < period_end_ts: for k in ("start_ts", "period_label", "rx", "tx", "total", "cap_bytes", "cap_hit"): if k in meta: usage[k] = meta[k] if verbose: print("Using usage from meta.json") return usage if verbose: print("meta.json missing/out-of-period, estimating from stats.json") try: with open(STATS_JSON, "r", encoding="utf-8") as f: s = json.load(f) or {} rows = s.get("data") or [] except Exception: rows = [] rows = sorted(rows, key=lambda r: int(r.get("t", 0))) prev = None total_rx = 0 total_tx = 0 for r in rows: t = int(r.get("t", 0)) if t < period_start_ts or t >= period_end_ts: continue if prev is not None: dr = max(0, int(r.get("read", 0)) - int(prev.get("read", 0))) dw = max(0, int(r.get("written", 0)) - int(prev.get("written", 0))) total_rx += dr total_tx += dw prev = r usage["rx"] = total_rx usage["tx"] = total_tx usage["total"] = total_rx + total_tx usage["start_ts"] = period_start_ts usage["period_label"] = period_label(datetime.fromtimestamp(period_start_ts), datetime.fromtimestamp(period_end_ts)) usage["cap_bytes"] = 0 usage["cap_hit"] = False return usage def update_meta_cap_hit(cap_hit: bool, period_start_ts: int, period_end_ts: int, verbose: bool = False): try: meta = {} if os.path.isfile(META_JSON): with open(META_JSON, "r", encoding="utf-8") as f: meta = json.load(f) or {} meta_start = int(meta.get("start_ts") or 0) if not (period_start_ts <= meta_start < period_end_ts): meta["start_ts"] = period_start_ts meta["period_label"] = period_label(datetime.fromtimestamp(period_start_ts), datetime.fromtimestamp(period_end_ts)) meta["cap_hit"] = bool(cap_hit) save_json_atomic(META_JSON, meta) if verbose: print(f"meta.json updated: cap_hit={cap_hit}") except Exception as e: if verbose: print(f"meta.json update skipped: {e}") def stop_service(service: str, verbose: bool = False, dry_run: bool = False): cmd = [SYSTEMCTL, "stop", service] if verbose: print("RUN:", " ".join(cmd) if not dry_run else "(dry-run) " + " ".join(cmd)) if not dry_run: subprocess.run(cmd, check=False) def main(): ap = argparse.ArgumentParser(description="SnowPanel monthly cap enforcer") ap.add_argument("-v", "--verbose", action="store_true", help="verbose output") ap.add_argument("--dry-run", action="store_true", help="do not stop the service") ap.add_argument("--service", default="snowflake-proxy", help="systemd service name") args = ap.parse_args() cap_gb, reset_day, rate_mbps = load_limits() start_dt, end_dt = period_bounds(reset_day) start_ts = int(start_dt.timestamp()) end_ts = int(end_dt.timestamp()) usage = load_usage(start_ts, end_ts, verbose=args.verbose) cap_bytes = usage.get("cap_bytes") or (cap_gb * (1024 ** 3)) total = int(usage.get("total") or 0) if args.verbose: print(f"Limits: cap_gb={cap_gb}, reset_day={reset_day}") print(f"Usage : total={total}B cap={cap_bytes}B start_ts={start_ts}") if cap_bytes > 0: pct = min(100, round(total * 100 / cap_bytes)) if cap_bytes else 0 print(f" {pretty_bytes(total)} / {pretty_bytes(cap_bytes)} ({pct}%)") else: print(" Unlimited cap") if cap_bytes > 0 and total >= cap_bytes: update_meta_cap_hit(True, start_ts, end_ts, verbose=args.verbose) if args.verbose: print(f"Cap reached — stopping {args.service}.") stop_service(args.service, verbose=args.verbose, dry_run=args.dry_run) else: update_meta_cap_hit(False, start_ts, end_ts, verbose=args.verbose) if args.verbose: print("Cap not reached — no action.") if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.exit(130)