feature/account-login-lock-coordinator #1

Merged
incentive merged 8 commits from feature/account-login-lock-coordinator into main 2026-06-11 12:37:29 -04:00
2 changed files with 504 additions and 0 deletions
+500
View File
@@ -11,6 +11,7 @@ from email.message import EmailMessage
from urllib.parse import urlencode from urllib.parse import urlencode
import psycopg import psycopg
from psycopg.types.json import Jsonb
from flask import Flask, jsonify, make_response, request, redirect from flask import Flask, jsonify, make_response, request, redirect
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
@@ -107,6 +108,29 @@ def init_db():
updated_at TIMESTAMPTZ NOT NULL DEFAULT now() updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
) )
""") """)
cur.execute("""
CREATE TABLE IF NOT EXISTS account_session_locks (
account_id BIGINT PRIMARY KEY,
holder_source TEXT NOT NULL,
source_region TEXT NOT NULL DEFAULT '',
source_ship TEXT NOT NULL DEFAULT '',
account_store TEXT NOT NULL DEFAULT 'shared',
state TEXT NOT NULL DEFAULT 'active',
sessions JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
expires_at TIMESTAMPTZ NOT NULL
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS account_session_locks_expires_at_idx
ON account_session_locks(expires_at)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS account_session_locks_holder_source_idx
ON account_session_locks(holder_source)
""")
return return
except Exception as e: except Exception as e:
last_err = e last_err = e
@@ -460,6 +484,12 @@ def bb_sync_info(account_id):
} }
def account_sync_current_on_all_regions(account_id):
sync = bb_sync_info(account_id)
return sync.get("status") == "current", sync
def bb_payload(account_id, username): def bb_payload(account_id, username):
sync = bb_sync_info(account_id) sync = bb_sync_info(account_id)
return { return {
@@ -523,6 +553,476 @@ def health():
return jsonify({"ok": True}) return jsonify({"ok": True})
# --- newserv account login lock coordinator -----------------------------------
def account_lock_lease_seconds():
return int(os.environ.get("ACCOUNT_LOCK_LEASE_SECONDS", "7200"))
def account_lock_expires_at():
return utcnow() + timedelta(seconds=account_lock_lease_seconds())
def require_newserv_shared_secret():
expected = (
os.environ.get("ACCOUNT_SYNC_SHARED_SECRET")
or os.environ.get("NEWSERV_SHARED_SECRET")
or ""
)
if not expected:
return jsonify({"ok": False, "error": "newserv account sync API is disabled"}), 403
got = request.headers.get("X-Psopeeps-Admin-Secret", "")
if not secrets.compare_digest(got, expected):
return jsonify({"ok": False, "error": "forbidden"}), 403
return None
def clean_lock_account_id(value):
try:
if isinstance(value, str):
value = value.strip()
if not value:
return None
return int(value, 10)
return int(value)
except Exception:
return None
def clean_lock_text(value, max_len=128):
value = str(value or "").strip()
if len(value) > max_len:
value = value[:max_len]
return value
def lock_row_payload(row):
if not row:
return None
sessions = row.get("sessions") or {}
if isinstance(sessions, str):
try:
sessions = json.loads(sessions)
except Exception:
sessions = {}
return {
"account_id": account_id_str(row["account_id"]),
"holder_source": row["holder_source"],
"source_region": row.get("source_region") or "",
"source_ship": row.get("source_ship") or "",
"account_store": row.get("account_store") or "",
"state": row.get("state") or "active",
"sessions": sessions,
"session_count": len(sessions),
"created_at": row["created_at"].isoformat() if row.get("created_at") else None,
"updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None,
"expires_at": row["expires_at"].isoformat() if row.get("expires_at") else None,
}
@app.post("/api/newserv/account-locks/acquire")
def newserv_account_lock_acquire():
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
data = json_body()
account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str"))
source = clean_lock_text(data.get("source"), 64)
source_region = clean_lock_text(data.get("source_region"), 32)
source_ship = clean_lock_text(data.get("source_ship"), 32)
account_store = clean_lock_text(data.get("account_store") or "shared", 32)
version = clean_lock_text(data.get("version"), 64)
session_nonce = clean_lock_text(data.get("session_nonce"), 160)
if not account_id:
return jsonify({"ok": False, "message": "missing account_id"}), 400
if not source:
return jsonify({"ok": False, "message": "missing source"}), 400
if not session_nonce:
return jsonify({"ok": False, "message": "missing session_nonce"}), 400
now = utcnow()
expires_at = account_lock_expires_at()
session_info = {
"source": source,
"source_region": source_region,
"source_ship": source_ship,
"account_store": account_store,
"version": version,
"created_at": now.isoformat(),
"updated_at": now.isoformat(),
}
with connect() as conn:
with conn.transaction():
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
FROM account_session_locks
WHERE account_id = %s
FOR UPDATE
""", (account_id,))
row = cur.fetchone()
if row and row["expires_at"] and row["expires_at"] <= now:
cur.execute("""
DELETE FROM account_session_locks
WHERE account_id = %s
""", (account_id,))
row = None
if not row:
sessions = {session_nonce: session_info}
cur.execute("""
INSERT INTO account_session_locks (
account_id, holder_source, source_region, source_ship,
account_store, state, sessions, expires_at
)
VALUES (%s, %s, %s, %s, %s, 'active', %s, %s)
RETURNING account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
""", (
account_id,
source,
source_region,
source_ship,
account_store,
Jsonb(sessions),
expires_at,
))
row = cur.fetchone()
return jsonify({
"ok": True,
"session_nonce": session_nonce,
"state": row["state"],
"holder_source": row["holder_source"],
"lock": lock_row_payload(row),
})
if row["holder_source"] != source:
return jsonify({
"ok": False,
"message": f"$C6Account is already active\\non {row['holder_source']}.",
"holder_source": row["holder_source"],
"state": row["state"],
"lock": lock_row_payload(row),
})
sessions = row["sessions"] or {}
if isinstance(sessions, str):
sessions = json.loads(sessions)
sessions[session_nonce] = session_info
cur.execute("""
UPDATE account_session_locks
SET state = 'active',
source_region = %s,
source_ship = %s,
account_store = %s,
sessions = %s,
updated_at = now(),
expires_at = %s
WHERE account_id = %s
RETURNING account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
""", (
source_region,
source_ship,
account_store,
Jsonb(sessions),
expires_at,
account_id,
))
row = cur.fetchone()
return jsonify({
"ok": True,
"session_nonce": session_nonce,
"state": row["state"],
"holder_source": row["holder_source"],
"lock": lock_row_payload(row),
})
@app.post("/api/newserv/account-locks/heartbeat")
def newserv_account_lock_heartbeat():
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
data = json_body()
source = clean_lock_text(data.get("source"), 64)
source_region = clean_lock_text(data.get("source_region"), 32)
source_ship = clean_lock_text(data.get("source_ship"), 32)
if not source:
return jsonify({"ok": False, "message": "missing source"}), 400
expires_at = account_lock_expires_at()
with connect() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE account_session_locks
SET updated_at = now(),
expires_at = %s
WHERE holder_source = %s
AND state IN ('active', 'draining')
""", (expires_at, source))
refreshed = cur.rowcount
reaped, kept = reap_draining_account_locks()
return jsonify({
"ok": True,
"source": source,
"source_region": source_region,
"source_ship": source_ship,
"refreshed": refreshed,
"expires_at": expires_at.isoformat(),
"reaped_draining_count": len(reaped),
"kept_draining_count": len(kept),
})
@app.post("/api/newserv/account-locks/session-end")
def newserv_account_lock_session_end():
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
data = json_body()
account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str"))
source = clean_lock_text(data.get("source"), 64)
session_nonce = clean_lock_text(data.get("session_nonce"), 160)
version = clean_lock_text(data.get("version"), 64)
if not account_id:
return jsonify({"ok": False, "message": "missing account_id"}), 400
if not source:
return jsonify({"ok": False, "message": "missing source"}), 400
if not session_nonce:
return jsonify({"ok": False, "message": "missing session_nonce"}), 400
now = utcnow()
expires_at = account_lock_expires_at()
with connect() as conn:
with conn.transaction():
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
FROM account_session_locks
WHERE account_id = %s
FOR UPDATE
""", (account_id,))
row = cur.fetchone()
if not row:
return jsonify({
"ok": True,
"account_id": account_id_str(account_id),
"source": source,
"session_nonce": session_nonce,
"version": version,
"removed": False,
"state": "unlocked",
"remaining_sessions": 0,
"message": "no active lock",
})
if row["expires_at"] and row["expires_at"] <= now:
cur.execute("""
DELETE FROM account_session_locks
WHERE account_id = %s
""", (account_id,))
return jsonify({
"ok": True,
"account_id": account_id_str(account_id),
"source": source,
"session_nonce": session_nonce,
"version": version,
"removed": False,
"state": "expired",
"remaining_sessions": 0,
"message": "expired lock removed",
})
if row["holder_source"] != source:
return jsonify({
"ok": False,
"account_id": account_id_str(account_id),
"source": source,
"session_nonce": session_nonce,
"holder_source": row["holder_source"],
"state": row["state"],
"message": "session source does not hold this lock",
"lock": lock_row_payload(row),
}), 409
sessions = row["sessions"] or {}
if isinstance(sessions, str):
sessions = json.loads(sessions)
removed = session_nonce in sessions
if removed:
sessions.pop(session_nonce, None)
if sessions:
cur.execute("""
UPDATE account_session_locks
SET state = 'active',
sessions = %s,
updated_at = now(),
expires_at = %s
WHERE account_id = %s
RETURNING account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
""", (Jsonb(sessions), expires_at, account_id))
row = cur.fetchone()
else:
cur.execute("""
UPDATE account_session_locks
SET state = 'draining',
sessions = '{}'::jsonb,
updated_at = now(),
expires_at = %s
WHERE account_id = %s
RETURNING account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
""", (expires_at, account_id))
row = cur.fetchone()
return jsonify({
"ok": True,
"account_id": account_id_str(account_id),
"source": source,
"session_nonce": session_nonce,
"version": version,
"removed": removed,
"state": row["state"],
"remaining_sessions": len(row["sessions"] or {}),
"lock": lock_row_payload(row),
})
def reap_draining_account_locks():
reaped = []
kept = []
with connect() as conn:
with conn.transaction():
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
FROM account_session_locks
WHERE state = 'draining'
ORDER BY updated_at ASC
FOR UPDATE
""")
rows = list(cur.fetchall())
for row in rows:
account_id = row["account_id"]
current, sync = account_sync_current_on_all_regions(account_id)
if current:
cur.execute("""
DELETE FROM account_session_locks
WHERE account_id = %s
""", (account_id,))
reaped.append({
"account_id": account_id_str(account_id),
"holder_source": row["holder_source"],
"sync_status": sync.get("status"),
})
else:
kept.append({
"account_id": account_id_str(account_id),
"holder_source": row["holder_source"],
"sync_status": sync.get("status"),
"regions": sync.get("regions"),
})
return reaped, kept
@app.post("/api/newserv/account-locks/reap-draining")
def newserv_account_lock_reap_draining():
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
reaped, kept = reap_draining_account_locks()
return jsonify({
"ok": True,
"reaped": reaped,
"kept": kept,
"reaped_count": len(reaped),
"kept_count": len(kept),
})
@app.get("/api/newserv/account-locks/<int:account_id>")
def newserv_account_lock_status(account_id):
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
with connect() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT account_id, holder_source, source_region, source_ship,
account_store, state, sessions, created_at, updated_at, expires_at
FROM account_session_locks
WHERE account_id = %s
""", (account_id,))
row = cur.fetchone()
return jsonify({
"ok": True,
"account_id": account_id_str(account_id),
"locked": bool(row),
"lock": lock_row_payload(row),
})
@app.post("/api/newserv/account-locks/<int:account_id>/admin-unlock")
def newserv_account_lock_admin_unlock(account_id):
auth_error = require_newserv_shared_secret()
if auth_error:
return auth_error
with connect() as conn:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM account_session_locks
WHERE account_id = %s
""", (account_id,))
return jsonify({
"ok": True,
"account_id": account_id_str(account_id),
"unlocked": True,
})
@app.get("/me") @app.get("/me")
def me(): def me():
user = current_user() user = current_user()
+4
View File
@@ -27,6 +27,8 @@ services:
restart: unless-stopped restart: unless-stopped
env_file: env_file:
- .env - .env
ports:
- "5.0.0.21:8080:${APP_PORT:-3000}"
environment: environment:
APP_PORT: ${APP_PORT:-8000} APP_PORT: ${APP_PORT:-8000}
POSTGRES_HOST: postgres POSTGRES_HOST: postgres
@@ -36,6 +38,8 @@ services:
SESSION_COOKIE_SAMESITE: ${SESSION_COOKIE_SAMESITE:-Lax} SESSION_COOKIE_SAMESITE: ${SESSION_COOKIE_SAMESITE:-Lax}
SESSION_DAYS: ${SESSION_DAYS:-30} SESSION_DAYS: ${SESSION_DAYS:-30}
ACCOUNT_SYNC_ROOT: /account-sync ACCOUNT_SYNC_ROOT: /account-sync
ACCOUNT_SYNC_SHARED_SECRET: ${ACCOUNT_SYNC_SHARED_SECRET:-}
ACCOUNT_LOCK_LEASE_SECONDS: ${ACCOUNT_LOCK_LEASE_SECONDS:-7200}
PUBLIC_BASE_URL: ${PUBLIC_BASE_URL:-https://psopeeps.online} PUBLIC_BASE_URL: ${PUBLIC_BASE_URL:-https://psopeeps.online}
SMTP_HOST: ${SMTP_HOST:-} SMTP_HOST: ${SMTP_HOST:-}
SMTP_PORT: ${SMTP_PORT:-587} SMTP_PORT: ${SMTP_PORT:-587}