feature/account-login-lock-coordinator
Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
+500
@@ -11,6 +11,7 @@ from email.message import EmailMessage
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import psycopg
|
||||
from psycopg.types.json import Jsonb
|
||||
from flask import Flask, jsonify, make_response, request, redirect
|
||||
from werkzeug.security import check_password_hash, generate_password_hash
|
||||
|
||||
@@ -107,6 +108,29 @@ def init_db():
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)
|
||||
""")
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS account_session_locks (
|
||||
account_id BIGINT PRIMARY KEY,
|
||||
holder_source TEXT NOT NULL,
|
||||
source_region TEXT NOT NULL DEFAULT '',
|
||||
source_ship TEXT NOT NULL DEFAULT '',
|
||||
account_store TEXT NOT NULL DEFAULT 'shared',
|
||||
state TEXT NOT NULL DEFAULT 'active',
|
||||
sessions JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
expires_at TIMESTAMPTZ NOT NULL
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS account_session_locks_expires_at_idx
|
||||
ON account_session_locks(expires_at)
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS account_session_locks_holder_source_idx
|
||||
ON account_session_locks(holder_source)
|
||||
""")
|
||||
return
|
||||
except Exception as e:
|
||||
last_err = e
|
||||
@@ -460,6 +484,12 @@ def bb_sync_info(account_id):
|
||||
}
|
||||
|
||||
|
||||
|
||||
def account_sync_current_on_all_regions(account_id):
|
||||
sync = bb_sync_info(account_id)
|
||||
return sync.get("status") == "current", sync
|
||||
|
||||
|
||||
def bb_payload(account_id, username):
|
||||
sync = bb_sync_info(account_id)
|
||||
return {
|
||||
@@ -523,6 +553,476 @@ def health():
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
# --- newserv account login lock coordinator -----------------------------------
|
||||
|
||||
def account_lock_lease_seconds():
|
||||
return int(os.environ.get("ACCOUNT_LOCK_LEASE_SECONDS", "7200"))
|
||||
|
||||
|
||||
def account_lock_expires_at():
|
||||
return utcnow() + timedelta(seconds=account_lock_lease_seconds())
|
||||
|
||||
|
||||
def require_newserv_shared_secret():
|
||||
expected = (
|
||||
os.environ.get("ACCOUNT_SYNC_SHARED_SECRET")
|
||||
or os.environ.get("NEWSERV_SHARED_SECRET")
|
||||
or ""
|
||||
)
|
||||
if not expected:
|
||||
return jsonify({"ok": False, "error": "newserv account sync API is disabled"}), 403
|
||||
|
||||
got = request.headers.get("X-Psopeeps-Admin-Secret", "")
|
||||
if not secrets.compare_digest(got, expected):
|
||||
return jsonify({"ok": False, "error": "forbidden"}), 403
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def clean_lock_account_id(value):
|
||||
try:
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
if not value:
|
||||
return None
|
||||
return int(value, 10)
|
||||
return int(value)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def clean_lock_text(value, max_len=128):
|
||||
value = str(value or "").strip()
|
||||
if len(value) > max_len:
|
||||
value = value[:max_len]
|
||||
return value
|
||||
|
||||
|
||||
def lock_row_payload(row):
|
||||
if not row:
|
||||
return None
|
||||
|
||||
sessions = row.get("sessions") or {}
|
||||
if isinstance(sessions, str):
|
||||
try:
|
||||
sessions = json.loads(sessions)
|
||||
except Exception:
|
||||
sessions = {}
|
||||
|
||||
return {
|
||||
"account_id": account_id_str(row["account_id"]),
|
||||
"holder_source": row["holder_source"],
|
||||
"source_region": row.get("source_region") or "",
|
||||
"source_ship": row.get("source_ship") or "",
|
||||
"account_store": row.get("account_store") or "",
|
||||
"state": row.get("state") or "active",
|
||||
"sessions": sessions,
|
||||
"session_count": len(sessions),
|
||||
"created_at": row["created_at"].isoformat() if row.get("created_at") else None,
|
||||
"updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None,
|
||||
"expires_at": row["expires_at"].isoformat() if row.get("expires_at") else None,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/api/newserv/account-locks/acquire")
|
||||
def newserv_account_lock_acquire():
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
data = json_body()
|
||||
|
||||
account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str"))
|
||||
source = clean_lock_text(data.get("source"), 64)
|
||||
source_region = clean_lock_text(data.get("source_region"), 32)
|
||||
source_ship = clean_lock_text(data.get("source_ship"), 32)
|
||||
account_store = clean_lock_text(data.get("account_store") or "shared", 32)
|
||||
version = clean_lock_text(data.get("version"), 64)
|
||||
session_nonce = clean_lock_text(data.get("session_nonce"), 160)
|
||||
|
||||
if not account_id:
|
||||
return jsonify({"ok": False, "message": "missing account_id"}), 400
|
||||
if not source:
|
||||
return jsonify({"ok": False, "message": "missing source"}), 400
|
||||
if not session_nonce:
|
||||
return jsonify({"ok": False, "message": "missing session_nonce"}), 400
|
||||
|
||||
now = utcnow()
|
||||
expires_at = account_lock_expires_at()
|
||||
|
||||
session_info = {
|
||||
"source": source,
|
||||
"source_region": source_region,
|
||||
"source_ship": source_ship,
|
||||
"account_store": account_store,
|
||||
"version": version,
|
||||
"created_at": now.isoformat(),
|
||||
"updated_at": now.isoformat(),
|
||||
}
|
||||
|
||||
with connect() as conn:
|
||||
with conn.transaction():
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
FOR UPDATE
|
||||
""", (account_id,))
|
||||
row = cur.fetchone()
|
||||
|
||||
if row and row["expires_at"] and row["expires_at"] <= now:
|
||||
cur.execute("""
|
||||
DELETE FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
""", (account_id,))
|
||||
row = None
|
||||
|
||||
if not row:
|
||||
sessions = {session_nonce: session_info}
|
||||
cur.execute("""
|
||||
INSERT INTO account_session_locks (
|
||||
account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, expires_at
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, 'active', %s, %s)
|
||||
RETURNING account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
""", (
|
||||
account_id,
|
||||
source,
|
||||
source_region,
|
||||
source_ship,
|
||||
account_store,
|
||||
Jsonb(sessions),
|
||||
expires_at,
|
||||
))
|
||||
row = cur.fetchone()
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"session_nonce": session_nonce,
|
||||
"state": row["state"],
|
||||
"holder_source": row["holder_source"],
|
||||
"lock": lock_row_payload(row),
|
||||
})
|
||||
|
||||
if row["holder_source"] != source:
|
||||
return jsonify({
|
||||
"ok": False,
|
||||
"message": f"$C6Account is already active\\non {row['holder_source']}.",
|
||||
"holder_source": row["holder_source"],
|
||||
"state": row["state"],
|
||||
"lock": lock_row_payload(row),
|
||||
})
|
||||
|
||||
sessions = row["sessions"] or {}
|
||||
if isinstance(sessions, str):
|
||||
sessions = json.loads(sessions)
|
||||
sessions[session_nonce] = session_info
|
||||
|
||||
cur.execute("""
|
||||
UPDATE account_session_locks
|
||||
SET state = 'active',
|
||||
source_region = %s,
|
||||
source_ship = %s,
|
||||
account_store = %s,
|
||||
sessions = %s,
|
||||
updated_at = now(),
|
||||
expires_at = %s
|
||||
WHERE account_id = %s
|
||||
RETURNING account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
""", (
|
||||
source_region,
|
||||
source_ship,
|
||||
account_store,
|
||||
Jsonb(sessions),
|
||||
expires_at,
|
||||
account_id,
|
||||
))
|
||||
row = cur.fetchone()
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"session_nonce": session_nonce,
|
||||
"state": row["state"],
|
||||
"holder_source": row["holder_source"],
|
||||
"lock": lock_row_payload(row),
|
||||
})
|
||||
|
||||
|
||||
@app.post("/api/newserv/account-locks/heartbeat")
|
||||
def newserv_account_lock_heartbeat():
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
data = json_body()
|
||||
source = clean_lock_text(data.get("source"), 64)
|
||||
source_region = clean_lock_text(data.get("source_region"), 32)
|
||||
source_ship = clean_lock_text(data.get("source_ship"), 32)
|
||||
|
||||
if not source:
|
||||
return jsonify({"ok": False, "message": "missing source"}), 400
|
||||
|
||||
expires_at = account_lock_expires_at()
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
UPDATE account_session_locks
|
||||
SET updated_at = now(),
|
||||
expires_at = %s
|
||||
WHERE holder_source = %s
|
||||
AND state IN ('active', 'draining')
|
||||
""", (expires_at, source))
|
||||
refreshed = cur.rowcount
|
||||
|
||||
reaped, kept = reap_draining_account_locks()
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"source": source,
|
||||
"source_region": source_region,
|
||||
"source_ship": source_ship,
|
||||
"refreshed": refreshed,
|
||||
"expires_at": expires_at.isoformat(),
|
||||
"reaped_draining_count": len(reaped),
|
||||
"kept_draining_count": len(kept),
|
||||
})
|
||||
|
||||
|
||||
|
||||
@app.post("/api/newserv/account-locks/session-end")
|
||||
def newserv_account_lock_session_end():
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
data = json_body()
|
||||
|
||||
account_id = clean_lock_account_id(data.get("account_id") or data.get("account_id_str"))
|
||||
source = clean_lock_text(data.get("source"), 64)
|
||||
session_nonce = clean_lock_text(data.get("session_nonce"), 160)
|
||||
version = clean_lock_text(data.get("version"), 64)
|
||||
|
||||
if not account_id:
|
||||
return jsonify({"ok": False, "message": "missing account_id"}), 400
|
||||
if not source:
|
||||
return jsonify({"ok": False, "message": "missing source"}), 400
|
||||
if not session_nonce:
|
||||
return jsonify({"ok": False, "message": "missing session_nonce"}), 400
|
||||
|
||||
now = utcnow()
|
||||
expires_at = account_lock_expires_at()
|
||||
|
||||
with connect() as conn:
|
||||
with conn.transaction():
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
FOR UPDATE
|
||||
""", (account_id,))
|
||||
row = cur.fetchone()
|
||||
|
||||
if not row:
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"source": source,
|
||||
"session_nonce": session_nonce,
|
||||
"version": version,
|
||||
"removed": False,
|
||||
"state": "unlocked",
|
||||
"remaining_sessions": 0,
|
||||
"message": "no active lock",
|
||||
})
|
||||
|
||||
if row["expires_at"] and row["expires_at"] <= now:
|
||||
cur.execute("""
|
||||
DELETE FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
""", (account_id,))
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"source": source,
|
||||
"session_nonce": session_nonce,
|
||||
"version": version,
|
||||
"removed": False,
|
||||
"state": "expired",
|
||||
"remaining_sessions": 0,
|
||||
"message": "expired lock removed",
|
||||
})
|
||||
|
||||
if row["holder_source"] != source:
|
||||
return jsonify({
|
||||
"ok": False,
|
||||
"account_id": account_id_str(account_id),
|
||||
"source": source,
|
||||
"session_nonce": session_nonce,
|
||||
"holder_source": row["holder_source"],
|
||||
"state": row["state"],
|
||||
"message": "session source does not hold this lock",
|
||||
"lock": lock_row_payload(row),
|
||||
}), 409
|
||||
|
||||
sessions = row["sessions"] or {}
|
||||
if isinstance(sessions, str):
|
||||
sessions = json.loads(sessions)
|
||||
|
||||
removed = session_nonce in sessions
|
||||
if removed:
|
||||
sessions.pop(session_nonce, None)
|
||||
|
||||
if sessions:
|
||||
cur.execute("""
|
||||
UPDATE account_session_locks
|
||||
SET state = 'active',
|
||||
sessions = %s,
|
||||
updated_at = now(),
|
||||
expires_at = %s
|
||||
WHERE account_id = %s
|
||||
RETURNING account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
""", (Jsonb(sessions), expires_at, account_id))
|
||||
row = cur.fetchone()
|
||||
else:
|
||||
cur.execute("""
|
||||
UPDATE account_session_locks
|
||||
SET state = 'draining',
|
||||
sessions = '{}'::jsonb,
|
||||
updated_at = now(),
|
||||
expires_at = %s
|
||||
WHERE account_id = %s
|
||||
RETURNING account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
""", (expires_at, account_id))
|
||||
row = cur.fetchone()
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"source": source,
|
||||
"session_nonce": session_nonce,
|
||||
"version": version,
|
||||
"removed": removed,
|
||||
"state": row["state"],
|
||||
"remaining_sessions": len(row["sessions"] or {}),
|
||||
"lock": lock_row_payload(row),
|
||||
})
|
||||
|
||||
|
||||
|
||||
|
||||
def reap_draining_account_locks():
|
||||
reaped = []
|
||||
kept = []
|
||||
|
||||
with connect() as conn:
|
||||
with conn.transaction():
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
FROM account_session_locks
|
||||
WHERE state = 'draining'
|
||||
ORDER BY updated_at ASC
|
||||
FOR UPDATE
|
||||
""")
|
||||
rows = list(cur.fetchall())
|
||||
|
||||
for row in rows:
|
||||
account_id = row["account_id"]
|
||||
current, sync = account_sync_current_on_all_regions(account_id)
|
||||
|
||||
if current:
|
||||
cur.execute("""
|
||||
DELETE FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
""", (account_id,))
|
||||
reaped.append({
|
||||
"account_id": account_id_str(account_id),
|
||||
"holder_source": row["holder_source"],
|
||||
"sync_status": sync.get("status"),
|
||||
})
|
||||
else:
|
||||
kept.append({
|
||||
"account_id": account_id_str(account_id),
|
||||
"holder_source": row["holder_source"],
|
||||
"sync_status": sync.get("status"),
|
||||
"regions": sync.get("regions"),
|
||||
})
|
||||
|
||||
return reaped, kept
|
||||
|
||||
|
||||
@app.post("/api/newserv/account-locks/reap-draining")
|
||||
def newserv_account_lock_reap_draining():
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
reaped, kept = reap_draining_account_locks()
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"reaped": reaped,
|
||||
"kept": kept,
|
||||
"reaped_count": len(reaped),
|
||||
"kept_count": len(kept),
|
||||
})
|
||||
|
||||
|
||||
|
||||
@app.get("/api/newserv/account-locks/<int:account_id>")
|
||||
def newserv_account_lock_status(account_id):
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT account_id, holder_source, source_region, source_ship,
|
||||
account_store, state, sessions, created_at, updated_at, expires_at
|
||||
FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
""", (account_id,))
|
||||
row = cur.fetchone()
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"locked": bool(row),
|
||||
"lock": lock_row_payload(row),
|
||||
})
|
||||
|
||||
|
||||
@app.post("/api/newserv/account-locks/<int:account_id>/admin-unlock")
|
||||
def newserv_account_lock_admin_unlock(account_id):
|
||||
auth_error = require_newserv_shared_secret()
|
||||
if auth_error:
|
||||
return auth_error
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
DELETE FROM account_session_locks
|
||||
WHERE account_id = %s
|
||||
""", (account_id,))
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"unlocked": True,
|
||||
})
|
||||
|
||||
|
||||
@app.get("/me")
|
||||
def me():
|
||||
user = current_user()
|
||||
|
||||
@@ -27,6 +27,8 @@ services:
|
||||
restart: unless-stopped
|
||||
env_file:
|
||||
- .env
|
||||
ports:
|
||||
- "5.0.0.21:8080:${APP_PORT:-3000}"
|
||||
environment:
|
||||
APP_PORT: ${APP_PORT:-8000}
|
||||
POSTGRES_HOST: postgres
|
||||
@@ -36,6 +38,8 @@ services:
|
||||
SESSION_COOKIE_SAMESITE: ${SESSION_COOKIE_SAMESITE:-Lax}
|
||||
SESSION_DAYS: ${SESSION_DAYS:-30}
|
||||
ACCOUNT_SYNC_ROOT: /account-sync
|
||||
ACCOUNT_SYNC_SHARED_SECRET: ${ACCOUNT_SYNC_SHARED_SECRET:-}
|
||||
ACCOUNT_LOCK_LEASE_SECONDS: ${ACCOUNT_LOCK_LEASE_SECONDS:-7200}
|
||||
PUBLIC_BASE_URL: ${PUBLIC_BASE_URL:-https://psopeeps.online}
|
||||
SMTP_HOST: ${SMTP_HOST:-}
|
||||
SMTP_PORT: ${SMTP_PORT:-587}
|
||||
|
||||
Reference in New Issue
Block a user