Files
rpi-tor-relay-panel/bin/torpanel-collect.py
2025-11-07 09:47:03 +01:00

60 lines
2.0 KiB
Python

#!/usr/bin/env python3
import json, os, socket, time
CONTROL_SOCK = "/run/tor/control"
COOKIE_FILE = "/run/tor/control.authcookie"
STATE_FILE = "/var/lib/torpanel/stats.json"
ROLL_MAX = 45000
def torctl_send(cmds):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(2.5); s.connect(CONTROL_SOCK)
cookie = open(COOKIE_FILE, "rb").read().hex().upper()
s.sendall(f"AUTHENTICATE {cookie}\r\n".encode())
if not s.recv(4096).decode(errors="ignore").startswith("250"):
raise RuntimeError("auth failed")
out = {}
for c in cmds:
s.sendall((c+"\r\n").encode())
buf=b""
while True:
chunk=s.recv(4096)
if not chunk: break
buf+=chunk
if buf.endswith(b"250 OK\r\n"): break
for line in buf.decode(errors="ignore").strip().splitlines():
if line.startswith("250-") or line.startswith("250 "):
line=line[4:]
if "=" in line:
k,v=line.split("=",1); out[k.strip()]=v.strip()
s.close(); return out
def load_state():
try: return json.load(open(STATE_FILE,"r"))
except: return {"data":[]}
def save_state(obj):
obj["data"] = obj["data"][-ROLL_MAX:]
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(obj, f, separators=(",", ":"))
f.flush(); os.fsync(f.fileno())
os.replace(tmp, STATE_FILE)
def main():
try:
info=torctl_send(["GETINFO traffic/read","GETINFO traffic/written","GETINFO status/circuit-established","GETINFO version"])
now=int(time.time())
state=load_state()
state["data"].append({
"t":now,
"read":int(info.get("traffic/read","0")),
"written":int(info.get("traffic/written","0")),
"circ":1 if info.get("status/circuit-established","0")=="1" else 0,
"version":info.get("version","unknown")
})
save_state(state)
except Exception:
pass
if __name__=="__main__": main()