import hashlib import json import os import re import secrets import time from datetime import datetime, timedelta, timezone from pathlib import Path import smtplib from email.message import EmailMessage from urllib.parse import urlencode import psycopg from psycopg.types.json import Jsonb from flask import Flask, jsonify, make_response, request, redirect from werkzeug.security import check_password_hash, generate_password_hash USERNAME_RE = re.compile(r"^[a-z0-9_-]{3,16}$") COOKIE_NAME = os.environ.get("SESSION_COOKIE_NAME", "psopeeps_session") COOKIE_SECURE = os.environ.get("SESSION_COOKIE_SECURE", "true").lower() == "true" COOKIE_SAMESITE = os.environ.get("SESSION_COOKIE_SAMESITE", "Lax") SESSION_DAYS = int(os.environ.get("SESSION_DAYS", "30")) def utcnow(): return datetime.now(timezone.utc) def db_dsn(): return ( f"postgresql://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" f"@{os.environ.get('POSTGRES_HOST', 'postgres')}:{os.environ.get('POSTGRES_PORT', '5432')}" f"/{os.environ['POSTGRES_DB']}" ) def connect(): return psycopg.connect(db_dsn(), autocommit=True) def init_db(): last_err = None for _ in range(30): try: with connect() as conn: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS site_users ( id BIGSERIAL PRIMARY KEY, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) cur.execute("ALTER TABLE site_users ADD COLUMN IF NOT EXISTS email TEXT") cur.execute("ALTER TABLE site_users ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ") cur.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS site_users_email_lower_unique_idx ON site_users (lower(email)) WHERE email IS NOT NULL """) cur.execute(""" CREATE TABLE IF NOT EXISTS email_verification_tokens ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE, email TEXT NOT NULL, token_hash TEXT NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), expires_at TIMESTAMPTZ NOT NULL, used_at TIMESTAMPTZ ) """) cur.execute(""" CREATE INDEX IF NOT EXISTS email_verification_tokens_user_id_idx ON email_verification_tokens(user_id) """) cur.execute(""" CREATE INDEX IF NOT EXISTS email_verification_tokens_expires_at_idx ON email_verification_tokens(expires_at) """) cur.execute(""" CREATE TABLE IF NOT EXISTS site_sessions ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE, token_hash TEXT NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), expires_at TIMESTAMPTZ NOT NULL ) """) cur.execute(""" CREATE INDEX IF NOT EXISTS site_sessions_user_id_idx ON site_sessions(user_id) """) cur.execute(""" CREATE INDEX IF NOT EXISTS site_sessions_expires_at_idx ON site_sessions(expires_at) """) cur.execute(""" CREATE TABLE IF NOT EXISTS bb_accounts ( user_id BIGINT PRIMARY KEY REFERENCES site_users(id) ON DELETE CASCADE, account_id BIGINT NOT NULL UNIQUE, username TEXT NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS account_session_locks ( account_id BIGINT PRIMARY KEY, holder_source TEXT NOT NULL, source_region TEXT NOT NULL DEFAULT '', source_ship TEXT NOT NULL DEFAULT '', account_store TEXT NOT NULL DEFAULT 'shared', state TEXT NOT NULL DEFAULT 'active', sessions JSONB NOT NULL DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), expires_at TIMESTAMPTZ NOT NULL ) """) cur.execute(""" CREATE INDEX IF NOT EXISTS account_session_locks_expires_at_idx ON account_session_locks(expires_at) """) cur.execute(""" CREATE INDEX IF NOT EXISTS account_session_locks_holder_source_idx ON account_session_locks(holder_source) """) return except Exception as e: last_err = e time.sleep(1) raise last_err def clean_username(value): value = (value or "").strip().lower() if not USERNAME_RE.match(value): return None return value EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def clean_email(value): value = (value or "").strip().lower() if not value or len(value) > 254 or not EMAIL_RE.match(value): return None return value def public_base_url(): return os.environ.get("PUBLIC_BASE_URL", "https://psopeeps.online").rstrip("/") def user_email_payload(user): email = user.get("email") verified_at = user.get("email_verified_at") return { "email": email, "verified": bool(verified_at), "verified_at": verified_at.isoformat() if hasattr(verified_at, "isoformat") else verified_at, "required": True, } def create_email_verification_token(user_id, email): token = secrets.token_urlsafe(48) expires_at = utcnow() + timedelta(hours=24) with connect() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE email_verification_tokens SET used_at = now() WHERE user_id = %s AND used_at IS NULL """, (user_id,)) cur.execute(""" INSERT INTO email_verification_tokens (user_id, email, token_hash, expires_at) VALUES (%s, %s, %s, %s) """, (user_id, email, token_hash(token), expires_at)) return token, expires_at def verification_link(token): return f"{public_base_url()}/api/email/verify?{urlencode({'token': token})}" def send_verification_email(to_email, username, link): smtp_host = os.environ.get("SMTP_HOST") smtp_port = int(os.environ.get("SMTP_PORT", "587")) smtp_user = os.environ.get("SMTP_USERNAME") smtp_password = os.environ.get("SMTP_PASSWORD") smtp_from = os.environ.get("SMTP_FROM", smtp_user or "no-reply@psopeeps.online") smtp_tls = os.environ.get("SMTP_TLS", "starttls").lower() if not smtp_host: if os.environ.get("EMAIL_DEBUG_SHOW_LINK", "").lower() in {"1", "true", "yes"}: return {"sent": False, "debug_link": link} raise RuntimeError("SMTP is not configured") msg = EmailMessage() msg["Subject"] = "Verify your PSO Peeps email" msg["From"] = smtp_from msg["To"] = to_email msg.set_content( f"Hi {username},\n\n" f"Verify your PSO Peeps email here:\n{link}\n\n" f"This link expires in 24 hours.\n" ) if smtp_tls == "ssl": with smtplib.SMTP_SSL(smtp_host, smtp_port, timeout=20) as smtp: if smtp_user: smtp.login(smtp_user, smtp_password or "") smtp.send_message(msg) else: with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as smtp: if smtp_tls == "starttls": smtp.starttls() if smtp_user: smtp.login(smtp_user, smtp_password or "") smtp.send_message(msg) return {"sent": True} def require_verified_email(user): if user and user.get("email_verified_at"): return None return jsonify({ "ok": False, "error": "Please verify your email before changing game account settings.", "email_required": True, }), 403 def json_body(): data = request.get_json(silent=True) return data if isinstance(data, dict) else {} def token_hash(token): return hashlib.sha256(token.encode("utf-8")).hexdigest() def current_user(): token = request.cookies.get(COOKIE_NAME) if not token: return None with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT u.id, u.username, u.email, u.email_verified_at FROM site_sessions s JOIN site_users u ON u.id = s.user_id WHERE s.token_hash = %s AND s.expires_at > now() LIMIT 1 """, (token_hash(token),)) return cur.fetchone() def issue_session(user_id): token = secrets.token_urlsafe(48) expires_at = utcnow() + timedelta(days=SESSION_DAYS) with connect() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO site_sessions (user_id, token_hash, expires_at) VALUES (%s, %s, %s) """, (user_id, token_hash(token), expires_at)) return token, expires_at def set_session_cookie(resp, token, expires_at): resp.set_cookie( COOKIE_NAME, token, expires=expires_at, httponly=True, secure=COOKIE_SECURE, samesite=COOKIE_SAMESITE, path="/", ) def clear_session_cookie(resp): resp.delete_cookie(COOKIE_NAME, path="/") def fnv1a32(text): h = 0x811C9DC5 for b in text.encode("utf-8"): h ^= b h = (h * 0x01000193) & 0xFFFFFFFF return h def bb_account_id(username): return fnv1a32(username) & 0x7FFFFFFF def account_id_str(account_id): return f"{int(account_id):010d}" def validate_bb_password(password): if not password: return "Blue Burst password is required." if len(password) > 16: return "Blue Burst password must be 16 characters or fewer." return None def license_text(account_id, username, password): u = json.dumps(username) p = json.dumps(password) return f'''{{ "BBTeamID": 0x0, "FormatVersion": 0x1, "AccountID": 0x{account_id:X}, "LastPlayerName": {u}, "DCNTELicenses": [], "BBLicenses": [ {{"UserName": {u}, "Password": {p}}} ], "BanEndTime": 0x0, "PCLicenses": [], "AutoReplyMessage": "", "GCLicenses": [], "AutoPatchesEnabled": ["MomokaItemExchangeFix", "DisableIdleDisconnect", "FastTekker", "Palette", "DrawDistance", "EnemyHPBars", "HungryMagSound"], "XBLicenses": [], "Flags": 0x7FFFFFFF, "Ep3TotalMesetaEarned": 0x0, "Ep3CurrentMeseta": 0x0, "DCLicenses": [], "UserFlags": 0x0 }}''' def account_root(account_id): root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) return root / "canonical" / "accounts" / account_id_str(account_id) def canonical_license_path(account_id): aid = account_id_str(account_id) return account_root(account_id) / "system" / "licenses" / f"{aid}.json" def sha256_file(path): h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def refresh_account_manifest(account_id): root = account_root(account_id) manifest_path = root / "manifest.json" if not root.is_dir(): return None existing = {} if manifest_path.exists(): try: existing = json.loads(manifest_path.read_text()) except Exception: existing = {} old_files = existing.get("files", {}) if isinstance(existing.get("files"), dict) else {} new_files = {} now = int(time.time()) for path in sorted(x for x in root.rglob("*") if x.is_file()): rel = path.relative_to(root).as_posix() if rel == "manifest.json": continue st = path.stat() digest = sha256_file(path) old = old_files.get(rel, {}) if ( old.get("sha256") == digest and int(old.get("size", -1)) == st.st_size and int(old.get("mtime_ns", -1)) == st.st_mtime_ns ): new_files[rel] = old else: new_files[rel] = { "inbox_path": f"site-generated:{rel}", "mtime_ns": st.st_mtime_ns, "promoted_at": now, "relative_path": rel, "sha256": digest, "size": st.st_size, "source": "site", } manifest = dict(existing) manifest["files"] = new_files manifest["updated_at"] = now tmp = manifest_path.with_suffix(".json.tmp") tmp.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n") tmp.replace(manifest_path) return sha256_file(manifest_path) def canonical_manifest_sha256(account_id): manifest = account_root(account_id) / "manifest.json" if not manifest.exists(): return None return sha256_file(manifest) def read_json_file(path): try: return json.loads(path.read_text()) except Exception: return None def bb_sync_info(account_id): aid = account_id_str(account_id) root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) manifest_sha = canonical_manifest_sha256(account_id) regions = { "us": { "host": "psopeeps_us", "targets": ["us-live", "us-hardcore"], "status": "pending", "applied_at": None, }, "eu": { "host": "psopeeps_eu", "targets": ["eu-live", "eu-hardcore"], "status": "pending", "applied_at": None, }, } if not manifest_sha: return { "status": "pending", "manifest_sha256": None, "regions": regions, } for region, info in regions.items(): applied_times = [] all_current = True for target in info["targets"]: state_path = root / "state" / "applied" / f"{info['host']}.{target}.site.{aid}.json" state = read_json_file(state_path) if state and state.get("applied_at"): applied_times.append(state.get("applied_at")) if not state or state.get("manifest_sha256") != manifest_sha: all_current = False info["status"] = "current" if all_current else "pending" info["applied_at"] = max(applied_times) if applied_times else None overall = "current" if all(x["status"] == "current" for x in regions.values()) else "pending" return { "status": overall, "manifest_sha256": manifest_sha, "regions": regions, } def account_sync_current_on_all_regions(account_id): sync = bb_sync_info(account_id) return sync.get("status") == "current", sync def bb_payload(account_id, username): sync = bb_sync_info(account_id) return { "created": True, "ready": sync["status"] == "current", "sync_status": sync["status"], "username": username, "account_id": account_id_str(account_id), "regions": sync["regions"], } def enqueue_account_sync(account_id, reason): root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) queue = root / "queue" / "apply" queue.mkdir(parents=True, exist_ok=True) aid = account_id_str(account_id) payload = { "account": aid, "reason": reason, "queued_at": int(time.time()), } final = queue / f"{int(time.time())}.{time.time_ns()}.{aid}.json" tmp = queue / f".{final.name}.tmp" tmp.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") tmp.replace(final) return final def write_bb_license(account_id, username, password): path = canonical_license_path(account_id) path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(license_text(account_id, username, password) + "\n") tmp.replace(path) refresh_account_manifest(account_id) enqueue_account_sync(account_id, "bb_license_updated") return path app = Flask(__name__) init_db() @app.after_request def add_headers(resp): resp.headers["Cache-Control"] = "no-store" return resp @app.get("/health") def health(): return jsonify({"ok": True}) # --- newserv account login lock coordinator ----------------------------------- def account_lock_lease_seconds(): return int(os.environ.get("ACCOUNT_LOCK_LEASE_SECONDS", "7200")) def account_lock_expires_at(): return utcnow() + timedelta(seconds=account_lock_lease_seconds()) def require_newserv_shared_secret(): expected = ( os.environ.get("ACCOUNT_SYNC_SHARED_SECRET") or os.environ.get("NEWSERV_SHARED_SECRET") or "" ) if not expected: return jsonify({"ok": False, "error": "newserv account sync API is disabled"}), 403 got = request.headers.get("X-Psopeeps-Admin-Secret", "") if not secrets.compare_digest(got, expected): return jsonify({"ok": False, "error": "forbidden"}), 403 return None def clean_lock_account_id(value): try: if isinstance(value, str): value = value.strip() if not value: return None return int(value, 10) return int(value) except Exception: return None def clean_lock_text(value, max_len=128): value = str(value or "").strip() if len(value) > max_len: value = value[:max_len] return value def lock_row_payload(row): if not row: return None sessions = row.get("sessions") or {} if isinstance(sessions, str): try: sessions = json.loads(sessions) except Exception: sessions = {} return { "account_id": account_id_str(row["account_id"]), "holder_source": row["holder_source"], "source_region": row.get("source_region") or "", "source_ship": row.get("source_ship") or "", "account_store": row.get("account_store") or "", "state": row.get("state") or "active", "sessions": sessions, "session_count": len(sessions), "created_at": row["created_at"].isoformat() if row.get("created_at") else None, "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None, "expires_at": row["expires_at"].isoformat() if row.get("expires_at") else None, } @app.post("/api/newserv/account-locks/acquire") def newserv_account_lock_acquire(): auth_error = require_newserv_shared_secret() if auth_error: return auth_error data = json_body() account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str")) source = clean_lock_text(data.get("source"), 64) source_region = clean_lock_text(data.get("source_region"), 32) source_ship = clean_lock_text(data.get("source_ship"), 32) account_store = clean_lock_text(data.get("account_store") or "shared", 32) version = clean_lock_text(data.get("version"), 64) session_nonce = clean_lock_text(data.get("session_nonce"), 160) if not account_id: return jsonify({"ok": False, "message": "missing account_id"}), 400 if not source: return jsonify({"ok": False, "message": "missing source"}), 400 if not session_nonce: return jsonify({"ok": False, "message": "missing session_nonce"}), 400 now = utcnow() expires_at = account_lock_expires_at() session_info = { "source": source, "source_region": source_region, "source_ship": source_ship, "account_store": account_store, "version": version, "created_at": now.isoformat(), "updated_at": now.isoformat(), } with connect() as conn: with conn.transaction(): with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at FROM account_session_locks WHERE account_id = %s FOR UPDATE """, (account_id,)) row = cur.fetchone() if row and row["expires_at"] and row["expires_at"] <= now: cur.execute(""" DELETE FROM account_session_locks WHERE account_id = %s """, (account_id,)) row = None if not row: sessions = {session_nonce: session_info} cur.execute(""" INSERT INTO account_session_locks ( account_id, holder_source, source_region, source_ship, account_store, state, sessions, expires_at ) VALUES (%s, %s, %s, %s, %s, 'active', %s, %s) RETURNING account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at """, ( account_id, source, source_region, source_ship, account_store, Jsonb(sessions), expires_at, )) row = cur.fetchone() return jsonify({ "ok": True, "session_nonce": session_nonce, "state": row["state"], "holder_source": row["holder_source"], "lock": lock_row_payload(row), }) if row["holder_source"] != source: return jsonify({ "ok": False, "message": f"$C6Account is already active\\non {row['holder_source']}.", "holder_source": row["holder_source"], "state": row["state"], "lock": lock_row_payload(row), }) sessions = row["sessions"] or {} if isinstance(sessions, str): sessions = json.loads(sessions) sessions[session_nonce] = session_info cur.execute(""" UPDATE account_session_locks SET state = 'active', source_region = %s, source_ship = %s, account_store = %s, sessions = %s, updated_at = now(), expires_at = %s WHERE account_id = %s RETURNING account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at """, ( source_region, source_ship, account_store, Jsonb(sessions), expires_at, account_id, )) row = cur.fetchone() return jsonify({ "ok": True, "session_nonce": session_nonce, "state": row["state"], "holder_source": row["holder_source"], "lock": lock_row_payload(row), }) @app.post("/api/newserv/account-locks/heartbeat") def newserv_account_lock_heartbeat(): auth_error = require_newserv_shared_secret() if auth_error: return auth_error data = json_body() source = clean_lock_text(data.get("source"), 64) source_region = clean_lock_text(data.get("source_region"), 32) source_ship = clean_lock_text(data.get("source_ship"), 32) if not source: return jsonify({"ok": False, "message": "missing source"}), 400 expires_at = account_lock_expires_at() with connect() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE account_session_locks SET updated_at = now(), expires_at = %s WHERE holder_source = %s AND state IN ('active', 'draining') """, (expires_at, source)) refreshed = cur.rowcount reaped, kept = reap_draining_account_locks() return jsonify({ "ok": True, "source": source, "source_region": source_region, "source_ship": source_ship, "refreshed": refreshed, "expires_at": expires_at.isoformat(), "reaped_draining_count": len(reaped), "kept_draining_count": len(kept), }) @app.post("/api/newserv/account-locks/session-end") def newserv_account_lock_session_end(): auth_error = require_newserv_shared_secret() if auth_error: return auth_error data = json_body() account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str")) source = clean_lock_text(data.get("source"), 64) session_nonce = clean_lock_text(data.get("session_nonce"), 160) version = clean_lock_text(data.get("version"), 64) if not account_id: return jsonify({"ok": False, "message": "missing account_id"}), 400 if not source: return jsonify({"ok": False, "message": "missing source"}), 400 if not session_nonce: return jsonify({"ok": False, "message": "missing session_nonce"}), 400 now = utcnow() expires_at = account_lock_expires_at() with connect() as conn: with conn.transaction(): with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at FROM account_session_locks WHERE account_id = %s FOR UPDATE """, (account_id,)) row = cur.fetchone() if not row: return jsonify({ "ok": True, "account_id": account_id_str(account_id), "source": source, "session_nonce": session_nonce, "version": version, "removed": False, "state": "unlocked", "remaining_sessions": 0, "message": "no active lock", }) if row["expires_at"] and row["expires_at"] <= now: cur.execute(""" DELETE FROM account_session_locks WHERE account_id = %s """, (account_id,)) return jsonify({ "ok": True, "account_id": account_id_str(account_id), "source": source, "session_nonce": session_nonce, "version": version, "removed": False, "state": "expired", "remaining_sessions": 0, "message": "expired lock removed", }) if row["holder_source"] != source: return jsonify({ "ok": False, "account_id": account_id_str(account_id), "source": source, "session_nonce": session_nonce, "holder_source": row["holder_source"], "state": row["state"], "message": "session source does not hold this lock", "lock": lock_row_payload(row), }), 409 sessions = row["sessions"] or {} if isinstance(sessions, str): sessions = json.loads(sessions) removed = session_nonce in sessions if removed: sessions.pop(session_nonce, None) if sessions: cur.execute(""" UPDATE account_session_locks SET state = 'active', sessions = %s, updated_at = now(), expires_at = %s WHERE account_id = %s RETURNING account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at """, (Jsonb(sessions), expires_at, account_id)) row = cur.fetchone() else: cur.execute(""" UPDATE account_session_locks SET state = 'draining', sessions = '{}'::jsonb, updated_at = now(), expires_at = %s WHERE account_id = %s RETURNING account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at """, (expires_at, account_id)) row = cur.fetchone() return jsonify({ "ok": True, "account_id": account_id_str(account_id), "source": source, "session_nonce": session_nonce, "version": version, "removed": removed, "state": row["state"], "remaining_sessions": len(row["sessions"] or {}), "lock": lock_row_payload(row), }) def reap_draining_account_locks(): reaped = [] kept = [] with connect() as conn: with conn.transaction(): with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at FROM account_session_locks WHERE state = 'draining' ORDER BY updated_at ASC FOR UPDATE """) rows = list(cur.fetchall()) for row in rows: account_id = row["account_id"] current, sync = account_sync_current_on_all_regions(account_id) if current: cur.execute(""" DELETE FROM account_session_locks WHERE account_id = %s """, (account_id,)) reaped.append({ "account_id": account_id_str(account_id), "holder_source": row["holder_source"], "sync_status": sync.get("status"), }) else: kept.append({ "account_id": account_id_str(account_id), "holder_source": row["holder_source"], "sync_status": sync.get("status"), "regions": sync.get("regions"), }) return reaped, kept @app.post("/api/newserv/account-locks/reap-draining") def newserv_account_lock_reap_draining(): auth_error = require_newserv_shared_secret() if auth_error: return auth_error reaped, kept = reap_draining_account_locks() return jsonify({ "ok": True, "reaped": reaped, "kept": kept, "reaped_count": len(reaped), "kept_count": len(kept), }) @app.get("/api/newserv/account-locks/") def newserv_account_lock_status(account_id): auth_error = require_newserv_shared_secret() if auth_error: return auth_error with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, holder_source, source_region, source_ship, account_store, state, sessions, created_at, updated_at, expires_at FROM account_session_locks WHERE account_id = %s """, (account_id,)) row = cur.fetchone() return jsonify({ "ok": True, "account_id": account_id_str(account_id), "locked": bool(row), "lock": lock_row_payload(row), }) @app.post("/api/newserv/account-locks//admin-unlock") def newserv_account_lock_admin_unlock(account_id): auth_error = require_newserv_shared_secret() if auth_error: return auth_error with connect() as conn: with conn.cursor() as cur: cur.execute(""" DELETE FROM account_session_locks WHERE account_id = %s """, (account_id,)) return jsonify({ "ok": True, "account_id": account_id_str(account_id), "unlocked": True, }) @app.get("/me") def me(): user = current_user() if not user: return jsonify({"authenticated": False}) return jsonify({ "authenticated": True, "user": { "id": user["id"], "username": user["username"], }, "email": user_email_payload(user), }) def local_syncer_save_summary(account_id): import json import os from pathlib import Path from datetime import datetime, timezone account = account_id_str(account_id) root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) applied_dir = root / "state" / "applied" paths = { "us": [ applied_dir / f"psopeeps_us.us-live.site.{account}.json", applied_dir / f"psopeeps_us.us-test.site.{account}.json", applied_dir / f"psopeeps_us.us-hardcore.site.{account}.json", ], "eu": [ applied_dir / f"psopeeps_eu.eu-live.site.{account}.json", applied_dir / f"psopeeps_eu.eu-test.site.{account}.json", applied_dir / f"psopeeps_eu.eu-hardcore.site.{account}.json", ], } def parse_time(value): if not value: return None try: return datetime.fromisoformat(str(value).replace("Z", "+00:00")) except Exception: return None regions = {} for region, path in paths.items(): info = { "status": "unknown", "label": "Not seen", "style": "warn", "host": None, "applied_at": None, "manifest_sha256": None, } states = [] errors = [] for path in paths[region]: if not path.exists(): continue try: data = json.loads(path.read_text()) data["_path"] = str(path) states.append(data) except Exception as e: errors.append(f"{path.name}: {e}") if states: latest_state = max(states, key=lambda x: str(x.get("applied_at") or "")) hashes = {x.get("manifest_sha256") for x in states if x.get("manifest_sha256")} info.update({ "status": "seen", "label": "Seen", "style": "warn", "host": latest_state.get("host"), "applied_at": latest_state.get("applied_at"), "manifest_sha256": latest_state.get("manifest_sha256"), "targets_seen": len(states), "targets_expected": len(paths[region]), "all_targets_same_hash": len(hashes) == 1, }) if errors: info["errors"] = errors regions[region] = info us_hash = regions["us"].get("manifest_sha256") eu_hash = regions["eu"].get("manifest_sha256") if us_hash and eu_hash and us_hash == eu_hash: for region in ("us", "eu"): regions[region]["status"] = "current" regions[region]["label"] = "Current" regions[region]["style"] = "good" times = [ t for t in ( parse_time(regions["us"].get("applied_at")), parse_time(regions["eu"].get("applied_at")), ) if t ] latest = max(times) if times else None return { "status": "current", "safe": True, "regions": regions, "last_sync": latest.isoformat() if latest else None, "message": "", } if not us_hash and not eu_hash: return { "status": "unknown", "safe": False, "regions": regions, "last_sync": None, "message": "No mirrored save data has been seen for this account yet.", } us_time = parse_time(regions["us"].get("applied_at")) or datetime.fromtimestamp(0, tz=timezone.utc) eu_time = parse_time(regions["eu"].get("applied_at")) or datetime.fromtimestamp(0, tz=timezone.utc) newest = "us" if us_time >= eu_time else "eu" older = "eu" if newest == "us" else "us" regions[newest]["status"] = "newest" regions[newest]["label"] = "Newest" regions[newest]["style"] = "good" regions[older]["status"] = "pending" regions[older]["label"] = "Pending" regions[older]["style"] = "warn" latest = max(us_time, eu_time) return { "status": "pending", "safe": False, "regions": regions, "last_sync": latest.isoformat() if latest else None, "message": "Save sync pending. If this does not clear soon, contact an admin in Discord.", } @app.get("/account") def account(): user = current_user() if not user: return jsonify({"authenticated": False}), 401 with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, username FROM bb_accounts WHERE user_id = %s LIMIT 1 """, (user["id"],)) bb = cur.fetchone() return jsonify({ "authenticated": True, "user": { "id": user["id"], "username": user["username"], }, "email": user_email_payload(user), "bb": bb_payload(bb["account_id"], bb["username"]) if bb else { "created": False, "ready": False, "sync_status": "missing", "username": user["username"], "account_id": None, "regions": { "us": {"host": "psopeeps_us", "status": "missing", "applied_at": None}, "eu": {"host": "psopeeps_eu", "status": "missing", "applied_at": None}, }, }, "v2_v3_keys": [], "save_sync": local_syncer_save_summary(bb["account_id"]) if bb else { "status": "missing", "message": "Create a Blue Burst account before save sync status is available.", "last_sync": None, "regions": { "us": {"host": "psopeeps_us", "status": "missing", "applied_at": None}, "eu": {"host": "psopeeps_eu", "status": "missing", "applied_at": None}, }, }, }) @app.post("/email/start") def email_start(): user = current_user() if not user: return jsonify({"ok": False, "error": "Not signed in."}), 401 data = json_body() email = clean_email(data.get("email")) if not email: return jsonify({"ok": False, "error": "Enter a valid email address."}), 400 try: with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" UPDATE site_users SET email = %s, email_verified_at = NULL, updated_at = now() WHERE id = %s RETURNING id, username, email, email_verified_at """, (email, user["id"])) updated = cur.fetchone() except psycopg.errors.UniqueViolation: return jsonify({"ok": False, "error": "That email is already in use."}), 409 token, expires_at = create_email_verification_token(updated["id"], updated["email"]) result = send_verification_email( updated["email"], updated["username"], verification_link(token), ) return jsonify({ "ok": True, "email": user_email_payload(updated), "verification": result, }) @app.post("/email/resend") def email_resend(): user = current_user() if not user: return jsonify({"ok": False, "error": "Not signed in."}), 401 email = clean_email(user.get("email")) if not email: return jsonify({"ok": False, "error": "No email address is set."}), 400 if user.get("email_verified_at"): return jsonify({"ok": True, "email": user_email_payload(user), "already_verified": True}) token, expires_at = create_email_verification_token(user["id"], email) result = send_verification_email( email, user["username"], verification_link(token), ) return jsonify({ "ok": True, "email": user_email_payload(user), "verification": result, }) @app.get("/email/verify") def email_verify(): token = request.args.get("token") or "" if not token: return redirect("/account-unverified.html?error=missing-token") th = token_hash(token) with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, user_id, email FROM email_verification_tokens WHERE token_hash = %s AND used_at IS NULL AND expires_at > now() LIMIT 1 """, (th,)) row = cur.fetchone() if not row: return redirect("/account-unverified.html?error=invalid-token") cur.execute(""" UPDATE site_users SET email = %s, email_verified_at = now(), updated_at = now() WHERE id = %s """, (row["email"], row["user_id"])) cur.execute(""" UPDATE email_verification_tokens SET used_at = now() WHERE id = %s """, (row["id"],)) return redirect("/account.html?verified=1") @app.post("/register") def register(): data = json_body() username = clean_username(data.get("username")) email = clean_email(data.get("email")) password = data.get("password") or "" if not username: return jsonify({ "ok": False, "error": "Username must be 3-16 characters: lowercase letters, numbers, underscore, or hyphen.", }), 400 if not email: return jsonify({"ok": False, "error": "Enter a valid email address."}), 400 if len(password) < 8: return jsonify({"ok": False, "error": "Password must be at least 8 characters."}), 400 try: with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" INSERT INTO site_users (username, email, password_hash) VALUES (%s, %s, %s) RETURNING id, username, email, email_verified_at """, (username, email, generate_password_hash(password))) user = cur.fetchone() except psycopg.errors.UniqueViolation: return jsonify({"ok": False, "error": "That username or email is already taken."}), 409 verify_token, verify_expires_at = create_email_verification_token(user["id"], user["email"]) verify_result = send_verification_email( user["email"], user["username"], verification_link(verify_token), ) token, expires_at = issue_session(user["id"]) resp = make_response(jsonify({ "ok": True, "user": { "id": user["id"], "username": user["username"], }, "email": user_email_payload(user), "verification": verify_result, })) set_session_cookie(resp, token, expires_at) return resp @app.post("/login") def login(): data = json_body() username = clean_username(data.get("username")) password = data.get("password") or "" if not username or not password: return jsonify({"ok": False, "error": "Username and password are required."}), 400 with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, username, password_hash, email, email_verified_at FROM site_users WHERE username = %s LIMIT 1 """, (username,)) user = cur.fetchone() if not user or not check_password_hash(user["password_hash"], password): return jsonify({"ok": False, "error": "Invalid username or password."}), 401 token, expires_at = issue_session(user["id"]) resp = make_response(jsonify({ "ok": True, "user": { "id": user["id"], "username": user["username"], }, })) set_session_cookie(resp, token, expires_at) return resp @app.post("/password/change") def password_change(): user = current_user() if not user: return jsonify({"ok": False, "error": "Not signed in."}), 401 data = json_body() current_password = data.get("current_password") or data.get("currentPassword") or "" new_password = data.get("password") or "" confirm = data.get("confirm_password") or data.get("confirmPassword") or "" if len(new_password) < 8: return jsonify({"ok": False, "error": "Password must be at least 8 characters."}), 400 if new_password != confirm: return jsonify({"ok": False, "error": "Passwords do not match."}), 400 with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, password_hash FROM site_users WHERE id = %s LIMIT 1 """, (user["id"],)) row = cur.fetchone() if not row or not check_password_hash(row["password_hash"], current_password): return jsonify({"ok": False, "error": "Current password is incorrect."}), 401 cur.execute(""" UPDATE site_users SET password_hash = %s, updated_at = now() WHERE id = %s """, (generate_password_hash(new_password), user["id"])) return jsonify({"ok": True}) @app.post("/logout") def logout(): token = request.cookies.get(COOKIE_NAME) if token: with connect() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM site_sessions WHERE token_hash = %s", (token_hash(token),)) resp = make_response(jsonify({"ok": True})) clear_session_cookie(resp) return resp @app.post("/bb/create") def bb_create(): user = current_user() if not user: return jsonify({"ok": False, "error": "Not signed in."}), 401 email_error = require_verified_email(user) if email_error: return email_error data = json_body() password = data.get("password") or "" confirm = data.get("confirm_password") or data.get("confirmPassword") or "" err = validate_bb_password(password) if err: return jsonify({"ok": False, "error": err}), 400 if password != confirm: return jsonify({"ok": False, "error": "Blue Burst passwords do not match."}), 400 username = user["username"] account_id = bb_account_id(username) with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute("SELECT user_id FROM bb_accounts WHERE user_id = %s", (user["id"],)) if cur.fetchone(): return jsonify({"ok": False, "error": "Blue Burst account already exists."}), 409 write_bb_license(account_id, username, password) cur.execute(""" INSERT INTO bb_accounts (user_id, account_id, username) VALUES (%s, %s, %s) RETURNING account_id, username """, (user["id"], account_id, username)) bb = cur.fetchone() return jsonify({"ok": True, "bb": bb_payload(bb["account_id"], bb["username"])}) @app.post("/bb/change-password") def bb_change_password(): user = current_user() if not user: return jsonify({"ok": False, "error": "Not signed in."}), 401 email_error = require_verified_email(user) if email_error: return email_error data = json_body() password = data.get("password") or "" confirm = data.get("confirm_password") or data.get("confirmPassword") or "" err = validate_bb_password(password) if err: return jsonify({"ok": False, "error": err}), 400 if password != confirm: return jsonify({"ok": False, "error": "Blue Burst passwords do not match."}), 400 with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT account_id, username FROM bb_accounts WHERE user_id = %s LIMIT 1 """, (user["id"],)) bb = cur.fetchone() if not bb: return jsonify({"ok": False, "error": "Blue Burst account does not exist yet."}), 404 write_bb_license(bb["account_id"], bb["username"], password) cur.execute(""" UPDATE bb_accounts SET updated_at = now() WHERE user_id = %s """, (user["id"],)) return jsonify({"ok": True, "bb": bb_payload(bb["account_id"], bb["username"])}) # Register V2 key profile API routes. from key_routes import register_key_routes register_key_routes( app, connect=connect, current_user=current_user, jsonify=jsonify, request=request, account_id_str=account_id_str, bb_account_id=bb_account_id, canonical_license_path=canonical_license_path, refresh_account_manifest=refresh_account_manifest, enqueue_account_sync=enqueue_account_sync, bb_sync_info=bb_sync_info, ) # --- Hardcore stats aggregator ------------------------------------------------- # Combines EU + US local Hardcore stats APIs for the website. # Source rows are kept conceptually separate, then merged by stable character key. import urllib.request as _hc_urllib_request import urllib.error as _hc_urllib_error from datetime import datetime as _hc_datetime, timezone as _hc_timezone def _hc_source_urls(): sources = [] eu = (os.environ.get("HARDCORE_STATS_EU_URL") or "").strip().rstrip("/") us = (os.environ.get("HARDCORE_STATS_US_URL") or "").strip().rstrip("/") if eu: sources.append(("eu", eu)) if us: sources.append(("us", us)) return sources def _hc_fetch_json(url, timeout=30): req = _hc_urllib_request.Request( url, headers={"User-Agent": "psopeeps-site-hardcore-aggregator/1.0"}, ) with _hc_urllib_request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8", errors="replace") return json.loads(raw) def _hc_get_source_characters(): rows = [] errors = [] for source_name, base_url in _hc_source_urls(): url = f"{base_url}/api/hardcore/characters" try: data = _hc_fetch_json(url) if isinstance(data, list): for row in data: if isinstance(row, dict): r = dict(row) r["_source_ship"] = source_name r["_source_url"] = base_url rows.append(r) else: errors.append({ "source_ship": source_name, "url": url, "error": "response was not a list", }) except Exception as e: errors.append({ "source_ship": source_name, "url": url, "error": str(e), }) return rows, errors def _hc_int(value, default=0): try: if value is None: return default return int(value) except Exception: return default def _hc_bool(value, default=True): if isinstance(value, bool): return value if value is None: return default if isinstance(value, str): return value.strip().lower() not in {"0", "false", "no", "dead"} return bool(value) def _hc_sort_ts(value): if not value: return "" return str(value) def _hc_character_key(row): guild_card = _hc_int(row.get("guild_card")) slot = _hc_int(row.get("character_slot")) creation_ts = _hc_int(row.get("character_creation_timestamp")) # Real creation timestamps are stable across synced saves and should be used. if creation_ts > 0: return (guild_card, slot, creation_ts) # Legacy timestamp-0 rows are kept separate by name so an old dead/test row # does not poison the current real character row. name = str(row.get("character_name") or "") return (guild_card, slot, f"legacy:{name}") def _hc_pick_display_row(rows): return sorted( rows, key=lambda r: ( _hc_int(r.get("total_exp")), _hc_int(r.get("level")), _hc_int(r.get("play_time_seconds")), _hc_sort_ts(r.get("updated_at")), ), reverse=True, )[0] def _hc_merge_character_rows(rows): by_key = {} for row in rows: by_key.setdefault(_hc_character_key(row), []).append(row) combined = [] for key, group in by_key.items(): display = _hc_pick_display_row(group) source_ships = sorted({ str(r.get("_source_ship") or "") for r in group if r.get("_source_ship") }) # Kills aggregate across source rows in the same character identity group. total_kills = sum(_hc_int(r.get("total_enemies_killed")) for r in group) # Higher numeric values win for progression fields. level = max(_hc_int(r.get("level"), 1) for r in group) total_exp = max(_hc_int(r.get("total_exp")) for r in group) play_time_seconds = max(_hc_int(r.get("play_time_seconds")) for r in group) # Death anywhere in the same identity group makes the combined row dead. alive = all(_hc_bool(r.get("alive"), True) for r in group) updated_at = max((str(r.get("updated_at") or "") for r in group), default="") last_seen_at = max((str(r.get("last_seen_at") or "") for r in group), default="") dead_at_values = [str(r.get("dead_at")) for r in group if r.get("dead_at")] dead_at = min(dead_at_values) if dead_at_values else None merged = dict(display) merged.pop("_source_url", None) merged["source_ships"] = source_ships merged["source_count"] = len(group) merged["guild_card"] = _hc_int(display.get("guild_card")) merged["character_slot"] = _hc_int(display.get("character_slot")) merged["character_creation_timestamp"] = _hc_int(display.get("character_creation_timestamp")) merged["level"] = level merged["total_exp"] = total_exp merged["play_time_seconds"] = play_time_seconds merged["total_enemies_killed"] = total_kills merged["alive"] = alive merged["dead_at"] = dead_at _hc_apply_canonical_death_overlay(merged) merged["last_seen_at"] = last_seen_at or None merged["updated_at"] = updated_at or None combined.append(merged) return combined def _hc_death_overlay_int(value): try: if value is None or value == "": return None return int(value) except Exception: return None def _hc_canonical_accounts_root(): import os from pathlib import Path return Path(os.environ.get( "PSOPEEPS_HC_CANONICAL_ACCOUNTS_ROOT", "/home/rbatty/.local/share/psopeeps_account_sync/canonical/accounts", )) def _hc_canonical_slot_is_dead(guild_card, character_slot): import json account_id = _hc_death_overlay_int(guild_card) slot = _hc_death_overlay_int(character_slot) if account_id is None or slot is None: return False players_dir = _hc_canonical_accounts_root() / f"{account_id:010d}" / "system" / "players" if not players_dir.exists(): return False if any(players_dir.glob(f"player_*_{slot}.psochar.hardcore-dead")): return True deaths_log = players_dir / "hardcore-deaths.jsonl" if not deaths_log.exists(): return False suffix = f"_{slot}.psochar" try: lines = deaths_log.read_text(errors="replace").splitlines() except Exception: return False for line in lines: line = line.strip() if not line: continue try: entry = json.loads(line) except Exception: continue if _hc_death_overlay_int(entry.get("account_id")) != account_id: continue if str(entry.get("character_file") or "").endswith(suffix): return True return False def _hc_apply_canonical_death_overlay(row): if _hc_canonical_slot_is_dead( row.get("guild_card") or row.get("account_id"), row.get("character_slot"), ): row["alive"] = False row.setdefault("dead_at", "canonical-hardcore-dead") return row def _hc_combined_payload(): source_rows, errors = _hc_get_source_characters() combined = _hc_merge_character_rows(source_rows) return combined, errors @app.get("/hardcore/sources") def hardcore_sources(): statuses = [] for source_name, base_url in _hc_source_urls(): status = { "source_ship": source_name, "base_url": base_url, "ok": False, "health": None, "error": None, } try: status["health"] = _hc_fetch_json(f"{base_url}/health") status["ok"] = bool(status["health"].get("ok")) if isinstance(status["health"], dict) else True except Exception as e: status["error"] = str(e) statuses.append(status) return jsonify({ "ok": all(s["ok"] for s in statuses) if statuses else False, "sources": statuses, }) @app.get("/hardcore/characters") def hardcore_characters_combined(): combined, errors = _hc_combined_payload() combined.sort( key=lambda r: ( _hc_int(r.get("total_exp")), _hc_int(r.get("level")), _hc_int(r.get("total_enemies_killed")), ), reverse=True, ) return jsonify({ "ok": not errors, "errors": errors, "count": len(combined), "characters": combined, }) @app.get("/hardcore/leaderboard/kills") def hardcore_leaderboard_kills_combined(): combined, errors = _hc_combined_payload() combined.sort( key=lambda r: ( _hc_int(r.get("total_enemies_killed")), _hc_int(r.get("total_exp")), _hc_int(r.get("level")), ), reverse=True, ) return jsonify(combined[:100]) @app.get("/hardcore/leaderboard/level") def hardcore_leaderboard_level_combined(): combined, errors = _hc_combined_payload() combined.sort( key=lambda r: ( _hc_int(r.get("level")), _hc_int(r.get("total_exp")), _hc_int(r.get("total_enemies_killed")), ), reverse=True, ) return jsonify(combined[:100]) @app.get("/hardcore/leaderboard/exp") def hardcore_leaderboard_exp_combined(): combined, errors = _hc_combined_payload() combined.sort( key=lambda r: ( _hc_int(r.get("total_exp")), _hc_int(r.get("level")), _hc_int(r.get("total_enemies_killed")), ), reverse=True, ) return jsonify(combined[:100]) def _hc_points_row(row): level = _hc_int(row.get("level")) total_exp = _hc_int(row.get("total_exp")) kills = _hc_int(row.get("total_enemies_killed")) play_time = _hc_int(row.get("play_time_seconds")) hours_played = play_time / 3600 if play_time > 0 else 0 level_points = (level * 1000) + (level * level * 10) exp_points = int(total_exp / 1000) kill_points = int(100 * (kills ** 0.5)) survival_time_points = min(10000, int(250 * (hours_played ** 0.5))) challenge_mode_points = 0 total_points_raw = level_points + exp_points + kill_points + survival_time_points + challenge_mode_points total_points = int(total_points_raw / 100) player_name = row.get("character_name") or row.get("name") or "" return { "PlayerName": player_name, "CharacterName": player_name, "Points": total_points, "TotalPoints": total_points, "Class": row.get("character_class") or "", "SecID": row.get("section_id") or "", "LevelPoints": level_points, "EXPPoints": exp_points, "KillPoints": kill_points, "SurvivalTimePoints": survival_time_points, "ChallengeModePoints": challenge_mode_points, "Level": level, "TotalEXP": total_exp, "Kills": kills, "TotalKills": kills, "PlayTimeSeconds": play_time, "Alive": _hc_bool(row.get("alive"), True), } @app.get("/hardcore/leaderboard/points") def hardcore_leaderboard_points_combined(): combined, errors = _hc_combined_payload() rows = [_hc_points_row(r) for r in combined] rows.sort(key=lambda r: r["TotalPoints"], reverse=True) return jsonify(rows[:100]) # --- end Hardcore stats aggregator ------------------------------------------- @app.get("/server-status") @app.get("/api/server-status") def public_server_status(): import json import os import urllib.parse import urllib.request prometheus_url = os.environ.get("PROMETHEUS_URL", "http://5.0.0.20:9090").rstrip("/") def prom_value(query): url = prometheus_url + "/api/v1/query?" + urllib.parse.urlencode({"query": query}) try: with urllib.request.urlopen(url, timeout=2.5) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception: return 0 if data.get("status") != "success": return 0 total = 0.0 for result in data.get("data", {}).get("result", []): value = result.get("value", [None, "0"])[1] try: total += float(value) except (TypeError, ValueError): pass return int(total) def q(metric, labels): label_text = ",".join(f'{k}="{v}"' for k, v in labels.items()) return f'sum({metric}{{{label_text}}}) or vector(0)' def newserv(region, service, ship, version): return prom_value(q("pso_newserv_clients_connected", { "region": region, "service": service, "ship": ship, "version": version, })) def adhoc(region, game): return prom_value(q("psppeeps_adhoc_connected_clients_by_product", { "region": region, "service": f"{region}-psppeeps-adhoc", "ship": "psp", "game": game, })) us = { "alis_v2": newserv("us", "us-newserv-live", "live", "v2"), "alis_v3": newserv("us", "us-newserv-live", "live", "v3"), "alis_bb": newserv("us", "us-newserv-live", "live", "v4"), "abion_hcbb": newserv("us", "us-newserv-hardcore", "hardcore", "v4"), "adhoc_psp1": adhoc("us", "psp1"), "adhoc_psp2i": adhoc("us", "psp2i"), } eu = { "palma_v2": newserv("eu", "eu-newserv-live", "live", "v2"), "palma_v3": newserv("eu", "eu-newserv-live", "live", "v3"), "palma_bb": newserv("eu", "eu-newserv-live", "live", "v4"), "aiedo_hcbb": newserv("eu", "eu-newserv-hardcore", "hardcore", "v4"), "adhoc_psp1": adhoc("eu", "psp1"), "adhoc_psp2i": adhoc("eu", "psp2i"), } us_total = sum(us.values()) eu_total = sum(eu.values()) return jsonify({ "servers": [ { "label": "US Server", "players": us_total, "ships": [ {"label": "Alis", "rows": [ {"label": "V2", "players": us["alis_v2"]}, {"label": "V3", "players": us["alis_v3"]}, {"label": "BB", "players": us["alis_bb"]}, ]}, {"label": "Abion", "rows": [ {"label": "HC/BB", "players": us["abion_hcbb"]}, ]}, {"label": "AdHoc-US", "rows": [ {"label": "PSP1", "players": us["adhoc_psp1"]}, {"label": "PSP2i", "players": us["adhoc_psp2i"]}, ]}, ], }, { "label": "EU Server", "players": eu_total, "ships": [ {"label": "Palma", "rows": [ {"label": "V2", "players": eu["palma_v2"]}, {"label": "V3", "players": eu["palma_v3"]}, {"label": "BB", "players": eu["palma_bb"]}, ]}, {"label": "Aiedo", "rows": [ {"label": "HC/BB", "players": eu["aiedo_hcbb"]}, ]}, {"label": "AdHoc-EU", "rows": [ {"label": "PSP1", "players": eu["adhoc_psp1"]}, {"label": "PSP2i", "players": eu["adhoc_psp2i"]}, ]}, ], }, ], })