From 5bec2eb3c7bdb49194ddf41ebc363d7022b42221 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 01:56:48 -0400 Subject: [PATCH 1/8] Add newserv account lock acquire endpoint --- backend/app.py | 267 +++++++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 2 + 2 files changed, 269 insertions(+) diff --git a/backend/app.py b/backend/app.py index cbb6d37..396117c 100644 --- a/backend/app.py +++ b/backend/app.py @@ -11,6 +11,7 @@ from email.message import EmailMessage from urllib.parse import urlencode import psycopg +from psycopg.types.json import Jsonb from flask import Flask, jsonify, make_response, request, redirect from werkzeug.security import check_password_hash, generate_password_hash @@ -107,6 +108,29 @@ def init_db(): updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """) + + cur.execute(""" + CREATE TABLE IF NOT EXISTS account_session_locks ( + account_id BIGINT PRIMARY KEY, + holder_source TEXT NOT NULL, + source_region TEXT NOT NULL DEFAULT '', + source_ship TEXT NOT NULL DEFAULT '', + account_store TEXT NOT NULL DEFAULT 'shared', + state TEXT NOT NULL DEFAULT 'active', + sessions JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + expires_at TIMESTAMPTZ NOT NULL + ) + """) + cur.execute(""" + CREATE INDEX IF NOT EXISTS account_session_locks_expires_at_idx + ON account_session_locks(expires_at) + """) + cur.execute(""" + CREATE INDEX IF NOT EXISTS account_session_locks_holder_source_idx + ON account_session_locks(holder_source) + """) return except Exception as e: last_err = e @@ -523,6 +547,249 @@ def health(): return jsonify({"ok": True}) +# --- newserv account login lock coordinator ----------------------------------- + +def account_lock_lease_seconds(): + return int(os.environ.get("ACCOUNT_LOCK_LEASE_SECONDS", "7200")) + + +def account_lock_expires_at(): + return utcnow() + timedelta(seconds=account_lock_lease_seconds()) + + +def require_newserv_shared_secret(): + expected = ( + os.environ.get("ACCOUNT_SYNC_SHARED_SECRET") + or os.environ.get("NEWSERV_SHARED_SECRET") + or "" + ) + if not expected: + return jsonify({"ok": False, "error": "newserv account sync API is disabled"}), 403 + + got = request.headers.get("X-Psopeeps-Admin-Secret", "") + if not secrets.compare_digest(got, expected): + return jsonify({"ok": False, "error": "forbidden"}), 403 + + return None + + +def clean_lock_account_id(value): + try: + if isinstance(value, str): + value = value.strip() + if not value: + return None + return int(value, 10) + return int(value) + except Exception: + return None + + +def clean_lock_text(value, max_len=128): + value = str(value or "").strip() + if len(value) > max_len: + value = value[:max_len] + return value + + +def lock_row_payload(row): + if not row: + return None + + sessions = row.get("sessions") or {} + if isinstance(sessions, str): + try: + sessions = json.loads(sessions) + except Exception: + sessions = {} + + return { + "account_id": account_id_str(row["account_id"]), + "holder_source": row["holder_source"], + "source_region": row.get("source_region") or "", + "source_ship": row.get("source_ship") or "", + "account_store": row.get("account_store") or "", + "state": row.get("state") or "active", + "sessions": sessions, + "session_count": len(sessions), + "created_at": row["created_at"].isoformat() if row.get("created_at") else None, + "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None, + "expires_at": row["expires_at"].isoformat() if row.get("expires_at") else None, + } + + +@app.post("/api/newserv/account-locks/acquire") +def newserv_account_lock_acquire(): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + data = json_body() + + account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str")) + source = clean_lock_text(data.get("source"), 64) + source_region = clean_lock_text(data.get("source_region"), 32) + source_ship = clean_lock_text(data.get("source_ship"), 32) + account_store = clean_lock_text(data.get("account_store") or "shared", 32) + version = clean_lock_text(data.get("version"), 64) + session_nonce = clean_lock_text(data.get("session_nonce"), 160) + + if not account_id: + return jsonify({"ok": False, "message": "missing account_id"}), 400 + if not source: + return jsonify({"ok": False, "message": "missing source"}), 400 + if not session_nonce: + return jsonify({"ok": False, "message": "missing session_nonce"}), 400 + + now = utcnow() + expires_at = account_lock_expires_at() + + session_info = { + "source": source, + "source_region": source_region, + "source_ship": source_ship, + "account_store": account_store, + "version": version, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + } + + with connect() as conn: + with conn.transaction(): + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + cur.execute(""" + SELECT account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + FROM account_session_locks + WHERE account_id = %s + FOR UPDATE + """, (account_id,)) + row = cur.fetchone() + + if row and row["expires_at"] and row["expires_at"] <= now: + cur.execute(""" + DELETE FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + row = None + + if not row: + sessions = {session_nonce: session_info} + cur.execute(""" + INSERT INTO account_session_locks ( + account_id, holder_source, source_region, source_ship, + account_store, state, sessions, expires_at + ) + VALUES (%s, %s, %s, %s, %s, 'active', %s, %s) + RETURNING account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + """, ( + account_id, + source, + source_region, + source_ship, + account_store, + Jsonb(sessions), + expires_at, + )) + row = cur.fetchone() + return jsonify({ + "ok": True, + "session_nonce": session_nonce, + "state": row["state"], + "holder_source": row["holder_source"], + "lock": lock_row_payload(row), + }) + + if row["holder_source"] != source: + return jsonify({ + "ok": False, + "message": f"$C6Account is already active\\non {row['holder_source']}.", + "holder_source": row["holder_source"], + "state": row["state"], + "lock": lock_row_payload(row), + }) + + sessions = row["sessions"] or {} + if isinstance(sessions, str): + sessions = json.loads(sessions) + sessions[session_nonce] = session_info + + cur.execute(""" + UPDATE account_session_locks + SET state = 'active', + source_region = %s, + source_ship = %s, + account_store = %s, + sessions = %s, + updated_at = now(), + expires_at = %s + WHERE account_id = %s + RETURNING account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + """, ( + source_region, + source_ship, + account_store, + Jsonb(sessions), + expires_at, + account_id, + )) + row = cur.fetchone() + + return jsonify({ + "ok": True, + "session_nonce": session_nonce, + "state": row["state"], + "holder_source": row["holder_source"], + "lock": lock_row_payload(row), + }) + + +@app.get("/api/newserv/account-locks/") +def newserv_account_lock_status(account_id): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + with connect() as conn: + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + cur.execute(""" + SELECT account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + row = cur.fetchone() + + return jsonify({ + "ok": True, + "account_id": account_id_str(account_id), + "locked": bool(row), + "lock": lock_row_payload(row), + }) + + +@app.post("/api/newserv/account-locks//admin-unlock") +def newserv_account_lock_admin_unlock(account_id): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + with connect() as conn: + with conn.cursor() as cur: + cur.execute(""" + DELETE FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + + return jsonify({ + "ok": True, + "account_id": account_id_str(account_id), + "unlocked": True, + }) + + @app.get("/me") def me(): user = current_user() diff --git a/docker-compose.yml b/docker-compose.yml index 12fc85b..384a90a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,8 @@ services: SESSION_COOKIE_SAMESITE: ${SESSION_COOKIE_SAMESITE:-Lax} SESSION_DAYS: ${SESSION_DAYS:-30} ACCOUNT_SYNC_ROOT: /account-sync + ACCOUNT_SYNC_SHARED_SECRET: ${ACCOUNT_SYNC_SHARED_SECRET:-} + ACCOUNT_LOCK_LEASE_SECONDS: ${ACCOUNT_LOCK_LEASE_SECONDS:-7200} PUBLIC_BASE_URL: ${PUBLIC_BASE_URL:-https://psopeeps.online} SMTP_HOST: ${SMTP_HOST:-} SMTP_PORT: ${SMTP_PORT:-587} -- 2.52.0 From f8f514e9847df561e3d659c19011db7552d6e962 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 01:58:50 -0400 Subject: [PATCH 2/8] Add newserv account lock heartbeat endpoint --- backend/app.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/backend/app.py b/backend/app.py index 396117c..2ae82ff 100644 --- a/backend/app.py +++ b/backend/app.py @@ -746,6 +746,44 @@ def newserv_account_lock_acquire(): }) +@app.post("/api/newserv/account-locks/heartbeat") +def newserv_account_lock_heartbeat(): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + data = json_body() + source = clean_lock_text(data.get("source"), 64) + source_region = clean_lock_text(data.get("source_region"), 32) + source_ship = clean_lock_text(data.get("source_ship"), 32) + + if not source: + return jsonify({"ok": False, "message": "missing source"}), 400 + + expires_at = account_lock_expires_at() + + with connect() as conn: + with conn.cursor() as cur: + cur.execute(""" + UPDATE account_session_locks + SET updated_at = now(), + expires_at = %s + WHERE holder_source = %s + AND state IN ('active', 'draining') + """, (expires_at, source)) + refreshed = cur.rowcount + + return jsonify({ + "ok": True, + "source": source, + "source_region": source_region, + "source_ship": source_ship, + "refreshed": refreshed, + "expires_at": expires_at.isoformat(), + }) + + + @app.get("/api/newserv/account-locks/") def newserv_account_lock_status(account_id): auth_error = require_newserv_shared_secret() -- 2.52.0 From 2bd09c5d5d5c1a8815aa9b616c0736f69286bcc8 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 02:09:05 -0400 Subject: [PATCH 3/8] Add newserv account lock session end endpoint --- backend/app.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/backend/app.py b/backend/app.py index 2ae82ff..88a2b60 100644 --- a/backend/app.py +++ b/backend/app.py @@ -784,6 +784,130 @@ def newserv_account_lock_heartbeat(): +@app.post("/api/newserv/account-locks/session-end") +def newserv_account_lock_session_end(): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + data = json_body() + + account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str")) + source = clean_lock_text(data.get("source"), 64) + session_nonce = clean_lock_text(data.get("session_nonce"), 160) + version = clean_lock_text(data.get("version"), 64) + + if not account_id: + return jsonify({"ok": False, "message": "missing account_id"}), 400 + if not source: + return jsonify({"ok": False, "message": "missing source"}), 400 + if not session_nonce: + return jsonify({"ok": False, "message": "missing session_nonce"}), 400 + + now = utcnow() + expires_at = account_lock_expires_at() + + with connect() as conn: + with conn.transaction(): + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + cur.execute(""" + SELECT account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + FROM account_session_locks + WHERE account_id = %s + FOR UPDATE + """, (account_id,)) + row = cur.fetchone() + + if not row: + return jsonify({ + "ok": True, + "account_id": account_id_str(account_id), + "source": source, + "session_nonce": session_nonce, + "version": version, + "removed": False, + "state": "unlocked", + "remaining_sessions": 0, + "message": "no active lock", + }) + + if row["expires_at"] and row["expires_at"] <= now: + cur.execute(""" + DELETE FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + return jsonify({ + "ok": True, + "account_id": account_id_str(account_id), + "source": source, + "session_nonce": session_nonce, + "version": version, + "removed": False, + "state": "expired", + "remaining_sessions": 0, + "message": "expired lock removed", + }) + + if row["holder_source"] != source: + return jsonify({ + "ok": False, + "account_id": account_id_str(account_id), + "source": source, + "session_nonce": session_nonce, + "holder_source": row["holder_source"], + "state": row["state"], + "message": "session source does not hold this lock", + "lock": lock_row_payload(row), + }), 409 + + sessions = row["sessions"] or {} + if isinstance(sessions, str): + sessions = json.loads(sessions) + + removed = session_nonce in sessions + if removed: + sessions.pop(session_nonce, None) + + if sessions: + cur.execute(""" + UPDATE account_session_locks + SET state = 'active', + sessions = %s, + updated_at = now(), + expires_at = %s + WHERE account_id = %s + RETURNING account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + """, (Jsonb(sessions), expires_at, account_id)) + row = cur.fetchone() + else: + cur.execute(""" + UPDATE account_session_locks + SET state = 'draining', + sessions = '{}'::jsonb, + updated_at = now(), + expires_at = %s + WHERE account_id = %s + RETURNING account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + """, (expires_at, account_id)) + row = cur.fetchone() + + return jsonify({ + "ok": True, + "account_id": account_id_str(account_id), + "source": source, + "session_nonce": session_nonce, + "version": version, + "removed": removed, + "state": row["state"], + "remaining_sessions": len(row["sessions"] or {}), + "lock": lock_row_payload(row), + }) + + + @app.get("/api/newserv/account-locks/") def newserv_account_lock_status(account_id): auth_error = require_newserv_shared_secret() -- 2.52.0 From c9fe7a3bd99c380c0f54a7761ebfdc1860f5d53e Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 02:36:41 -0400 Subject: [PATCH 4/8] Add safe reaping for draining account locks --- backend/app.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/backend/app.py b/backend/app.py index 88a2b60..3bf5db0 100644 --- a/backend/app.py +++ b/backend/app.py @@ -484,6 +484,12 @@ def bb_sync_info(account_id): } + +def account_sync_current_on_all_regions(account_id): + sync = bb_sync_info(account_id) + return sync.get("status") == "current", sync + + def bb_payload(account_id, username): sync = bb_sync_info(account_id) return { @@ -908,6 +914,60 @@ def newserv_account_lock_session_end(): +@app.post("/api/newserv/account-locks/reap-draining") +def newserv_account_lock_reap_draining(): + auth_error = require_newserv_shared_secret() + if auth_error: + return auth_error + + reaped = [] + kept = [] + + with connect() as conn: + with conn.transaction(): + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + cur.execute(""" + SELECT account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + FROM account_session_locks + WHERE state = 'draining' + ORDER BY updated_at ASC + FOR UPDATE + """) + rows = list(cur.fetchall()) + + for row in rows: + account_id = row["account_id"] + current, sync = account_sync_current_on_all_regions(account_id) + + if current: + cur.execute(""" + DELETE FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + reaped.append({ + "account_id": account_id_str(account_id), + "holder_source": row["holder_source"], + "sync_status": sync.get("status"), + }) + else: + kept.append({ + "account_id": account_id_str(account_id), + "holder_source": row["holder_source"], + "sync_status": sync.get("status"), + "regions": sync.get("regions"), + }) + + return jsonify({ + "ok": True, + "reaped": reaped, + "kept": kept, + "reaped_count": len(reaped), + "kept_count": len(kept), + }) + + + @app.get("/api/newserv/account-locks/") def newserv_account_lock_status(account_id): auth_error = require_newserv_shared_secret() -- 2.52.0 From b5551dc1b660332cb2442bdb986db64296797f81 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 02:37:43 -0400 Subject: [PATCH 5/8] Reap draining account locks during heartbeat --- backend/app.py | 49 ++++++++++++------------------------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/backend/app.py b/backend/app.py index 3bf5db0..aae961a 100644 --- a/backend/app.py +++ b/backend/app.py @@ -779,6 +779,8 @@ def newserv_account_lock_heartbeat(): """, (expires_at, source)) refreshed = cur.rowcount + reaped, kept = reap_draining_account_locks() + return jsonify({ "ok": True, "source": source, @@ -786,6 +788,8 @@ def newserv_account_lock_heartbeat(): "source_ship": source_ship, "refreshed": refreshed, "expires_at": expires_at.isoformat(), + "reaped_draining_count": len(reaped), + "kept_draining_count": len(kept), }) @@ -914,49 +918,20 @@ def newserv_account_lock_session_end(): +def reap_draining_account_locks(): + reaped, kept = reap_draining_account_locks() + + return reaped, kept + + + @app.post("/api/newserv/account-locks/reap-draining") def newserv_account_lock_reap_draining(): auth_error = require_newserv_shared_secret() if auth_error: return auth_error - reaped = [] - kept = [] - - with connect() as conn: - with conn.transaction(): - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - SELECT account_id, holder_source, source_region, source_ship, - account_store, state, sessions, created_at, updated_at, expires_at - FROM account_session_locks - WHERE state = 'draining' - ORDER BY updated_at ASC - FOR UPDATE - """) - rows = list(cur.fetchall()) - - for row in rows: - account_id = row["account_id"] - current, sync = account_sync_current_on_all_regions(account_id) - - if current: - cur.execute(""" - DELETE FROM account_session_locks - WHERE account_id = %s - """, (account_id,)) - reaped.append({ - "account_id": account_id_str(account_id), - "holder_source": row["holder_source"], - "sync_status": sync.get("status"), - }) - else: - kept.append({ - "account_id": account_id_str(account_id), - "holder_source": row["holder_source"], - "sync_status": sync.get("status"), - "regions": sync.get("regions"), - }) + reaped, kept = reap_draining_account_locks() return jsonify({ "ok": True, -- 2.52.0 From 54d328209f06c237054fa79393f1c97e66b72fbb Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 02:46:52 -0400 Subject: [PATCH 6/8] Expose coordinator app on Nebula port 8080 --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 384a90a..1d8f7fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,8 @@ services: restart: unless-stopped env_file: - .env + ports: + - "5.0.0.21:8080:8000" environment: APP_PORT: ${APP_PORT:-8000} POSTGRES_HOST: postgres -- 2.52.0 From e7fd22f89699b0831f93c83dd9c9d2c4363cbfb9 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 03:01:40 -0400 Subject: [PATCH 7/8] Fix coordinator port target --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1d8f7fa..8621564 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,7 +28,7 @@ services: env_file: - .env ports: - - "5.0.0.21:8080:8000" + - "5.0.0.21:8080:${APP_PORT:-3000}" environment: APP_PORT: ${APP_PORT:-8000} POSTGRES_HOST: postgres -- 2.52.0 From 0b5fe411a0a5eaf8c35e33501fc4ef12ad9f6837 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 09:52:42 -0400 Subject: [PATCH 8/8] Fix draining account lock reaper recursion --- backend/app.py | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/backend/app.py b/backend/app.py index aae961a..faa63e7 100644 --- a/backend/app.py +++ b/backend/app.py @@ -918,13 +918,49 @@ def newserv_account_lock_session_end(): + def reap_draining_account_locks(): - reaped, kept = reap_draining_account_locks() + reaped = [] + kept = [] + + with connect() as conn: + with conn.transaction(): + with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: + cur.execute(""" + SELECT account_id, holder_source, source_region, source_ship, + account_store, state, sessions, created_at, updated_at, expires_at + FROM account_session_locks + WHERE state = 'draining' + ORDER BY updated_at ASC + FOR UPDATE + """) + rows = list(cur.fetchall()) + + for row in rows: + account_id = row["account_id"] + current, sync = account_sync_current_on_all_regions(account_id) + + if current: + cur.execute(""" + DELETE FROM account_session_locks + WHERE account_id = %s + """, (account_id,)) + reaped.append({ + "account_id": account_id_str(account_id), + "holder_source": row["holder_source"], + "sync_status": sync.get("status"), + }) + else: + kept.append({ + "account_id": account_id_str(account_id), + "holder_source": row["holder_source"], + "sync_status": sync.get("status"), + "regions": sync.get("regions"), + }) return reaped, kept - @app.post("/api/newserv/account-locks/reap-draining") def newserv_account_lock_reap_draining(): auth_error = require_newserv_shared_secret() -- 2.52.0