import json import re import os from pathlib import Path import psycopg SUPPORTED_KEY_VERSIONS = { "dc_v2": {"label": "DC V2"}, "pc_v2": {"label": "PC V2"}, "gc_v3": {"label": "GC V3"}, } def register_key_routes( app, *, connect, current_user, jsonify, request, account_id_str, bb_account_id, canonical_license_path, refresh_account_manifest, enqueue_account_sync, bb_sync_info, ): def ensure_key_profile_table(): with connect() as conn: with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS v2_v3_key_profiles ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE, account_id BIGINT NOT NULL, game_version TEXT NOT NULL, label TEXT NOT NULL DEFAULT '', serial_number_hex TEXT NOT NULL, access_key TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (game_version, serial_number_hex) ) """) conn.commit() def site_account_id(username): return bb_account_id(username) def user_has_bb_account(conn, user_id): with conn.cursor() as cur: cur.execute( "SELECT 1 FROM bb_accounts WHERE user_id = %s LIMIT 1", (user_id,), ) return cur.fetchone() is not None def normalize_serial_hex(value, game_version): raw = str(value or "").strip() if not raw: raise ValueError("serial number is required") if game_version == "dc_v2": v = raw.upper() if v.startswith("0X"): v = v[2:] if not re.fullmatch(r"[0-9A-F]{1,8}", v): raise ValueError("DC V2 serial must be hex, like 4E62F237") return f"{int(v, 16):08X}" if game_version == "pc_v2": if not re.fullmatch(r"[0-9]{1,10}", raw): raise ValueError("PC V2 serial must be digits only") n = int(raw, 10) if not (0 <= n <= 0xFFFFFFFF): raise ValueError("PC V2 serial is out of range") return f"{n:08X}" if game_version == "gc_v3": if not re.fullmatch(r"[0-9]{2}-[0-9]{4}-[0-9]{4}", raw): raise ValueError("GC V3 serial must use the dashed format, like 11-3273-6540") n = int(raw.replace("-", ""), 10) if not (0 <= n <= 0xFFFFFFFF): raise ValueError("GC V3 serial is out of range") return f"{n:08X}" raise ValueError("unsupported game version") def validate_secret(value): v = str(value or "").strip() if not v: raise ValueError("key is required") if len(v) > 64: raise ValueError("key is too long") if any(ord(ch) < 0x20 for ch in v): raise ValueError("key contains invalid characters") return v def phosg_string(value): return json.dumps(str(value)) def base_site_license_text(account_id, username): aid_hex = f"0x{int(account_id):X}" uname = phosg_string(username) return ( "{\n" " \"BBTeamID\": 0x0,\n" " \"FormatVersion\": 0x1,\n" f" \"AccountID\": {aid_hex},\n" f" \"LastPlayerName\": {uname},\n" " \"DCNTELicenses\": [],\n" " \"BBLicenses\": [],\n" " \"BanEndTime\": 0x0,\n" " \"PCLicenses\": [],\n" " \"AutoReplyMessage\": \"\",\n" " \"GCLicenses\": [],\n" " \"AutoPatchesEnabled\": [\"PsoPeepsV2EXP_enabled\", \"RareDropNotifications\", \"UltimateMapFix\", \"RaresInQuests\", \"DisableIdleDisconnect\", \"ItemLossPrevention\"],\n" " \"XBLicenses\": [],\n" " \"Flags\": 0x0,\n" " \"Ep3TotalMesetaEarned\": 0x0,\n" " \"Ep3CurrentMeseta\": 0x0,\n" " \"DCLicenses\": [],\n" " \"UserFlags\": 0x0\n" "}" ) def find_array_span(text, array_name): marker = f'"{array_name}":' marker_pos = text.find(marker) if marker_pos < 0: return None bracket_pos = text.find("[", marker_pos) if bracket_pos < 0: return None depth = 0 in_string = False escape = False for i in range(bracket_pos, len(text)): ch = text[i] if in_string: if escape: escape = False elif ch == "\\": escape = True elif ch == '"': in_string = False else: if ch == '"': in_string = True elif ch == "[": depth += 1 elif ch == "]": depth -= 1 if depth == 0: return bracket_pos, i + 1 return None def replace_license_array(text, array_name, entries): span = find_array_span(text, array_name) if span is None: raise RuntimeError(f"could not find {array_name} in license file") start, end = span rendered = "[]" if not entries else "[\n" + ",\n".join(f" {e}" for e in entries) + "\n ]" return text[:start] + rendered + text[end:] def render_v2_entry(row): serial_hex = row["serial_number_hex"].upper() secret = phosg_string(row["access_key"]) return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}}}' def rewrite_v2_license_arrays(conn, user, account_id): path = canonical_license_path(account_id) path.parent.mkdir(parents=True, exist_ok=True) text = path.read_text() if path.exists() else base_site_license_text(account_id, user["username"]) with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT game_version, serial_number_hex, access_key FROM v2_v3_key_profiles WHERE user_id = %s ORDER BY id """, (user["id"],)) rows = list(cur.fetchall()) dc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "dc_v2"] pc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "pc_v2"] gc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "gc_v3"] text = replace_license_array(text, "DCLicenses", dc_entries) text = replace_license_array(text, "PCLicenses", pc_entries) text = replace_license_array(text, "GCLicenses", gc_entries) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(text + "\n") tmp.replace(path) sync_root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) flat_path = sync_root / "canonical-system" / "system" / "licenses" / path.name flat_path.parent.mkdir(parents=True, exist_ok=True) flat_tmp = flat_path.with_suffix(flat_path.suffix + ".tmp") flat_tmp.write_text(text + "\n") flat_tmp.replace(flat_path) refresh_account_manifest(account_id) enqueue_account_sync(account_id, "v2_key_profile_updated") return path @app.get("/keys") def list_key_profiles(): user = current_user() if not user: return jsonify({"authenticated": False}), 401 ensure_key_profile_table() account_id = site_account_id(user["username"]) sync = bb_sync_info(account_id) with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, game_version, label, serial_number_hex, created_at, updated_at FROM v2_v3_key_profiles WHERE user_id = %s ORDER BY id """, (user["id"],)) rows = list(cur.fetchall()) return jsonify({ "authenticated": True, "account_id": account_id_str(account_id), "sync_status": sync["status"], "regions": sync["regions"], "keys": [{ "id": row["id"], "game_version": row["game_version"], "game_version_label": SUPPORTED_KEY_VERSIONS.get(row["game_version"], {}).get("label", row["game_version"]), "label": row["label"], "serial_number_hex": row["serial_number_hex"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, } for row in rows], }) def license_text_has_serial(text, serial_hex): want_hex = (serial_hex or "").upper().lstrip("0") or "0" for match in re.finditer(r'"SerialNumber"\s*:\s*0x([0-9A-Fa-f]+)', text): got_hex = match.group(1).upper().lstrip("0") or "0" if got_hex == want_hex: return True try: want_dec = str(int(serial_hex, 16)) except ValueError: want_dec = None if want_dec is not None: for match in re.finditer(r'"SerialNumber"\s*:\s*([0-9]+)', text): if match.group(1) == want_dec: return True return False def find_existing_key_owner(serial_hex, current_account_id): root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) current_ids = { account_id_str(current_account_id), str(int(current_account_id)), } def owner_for_license_path(license_path): try: rel = license_path.relative_to(root) parts = rel.parts except Exception: return license_path.stem # canonical/accounts/ACCOUNT/system/licenses/ACCOUNT.json # canonical/REGION/accounts/ACCOUNT/system/licenses/ACCOUNT.json # inbox/REGION/SOURCE/ACCOUNT/system/licenses/ACCOUNT.json for i in range(1, len(parts) - 2): if parts[i:i + 2] == ("system", "licenses"): possible = parts[i - 1] if re.fullmatch(r"[0-9]{10}", possible): return possible # canonical-system/system/licenses/ACCOUNT.json return license_path.stem search_paths = [] seen_paths = set() for base in ( root / "canonical", root / "canonical-system", root / "inbox", ): if not base.is_dir(): continue for license_path in sorted(base.rglob("system/licenses/*.json")): if license_path in seen_paths: continue seen_paths.add(license_path) search_paths.append((license_path, owner_for_license_path(license_path))) for license_path, owner_account_id in search_paths: if owner_account_id in current_ids: continue try: text = license_path.read_text(errors="ignore") except OSError: continue if license_text_has_serial(text, serial_hex): return { "account_id": owner_account_id, "path": str(license_path), } return None @app.post("/keys/register") def register_key_profile(): user = current_user() if not user: return jsonify({"error": "not authenticated"}), 401 if not user.get("email_verified_at"): return jsonify({ "error": "Please verify your email before changing game account settings.", "email_required": True, }), 403 ensure_key_profile_table() with connect() as conn: if not user_has_bb_account(conn, user["id"]): return jsonify({ "error": "Create your Blue Burst account before adding DC V2, PC V2, or GC V3 keys.", "bb_account_required": True, }), 409 data = request.get_json(silent=True) or {} game_version = str(data.get("game_version") or "").strip().lower() label = str(data.get("label") or "").strip()[:80] if game_version not in SUPPORTED_KEY_VERSIONS: return jsonify({"error": "unsupported game version"}), 400 try: serial_hex = normalize_serial_hex(data.get("serial_number"), game_version) key_secret = validate_secret(data.get("access_key")) except ValueError as e: return jsonify({"error": str(e)}), 400 account_id = site_account_id(user["username"]) duplicate = find_existing_key_owner(serial_hex, account_id) if duplicate: return jsonify({ "error": "that key is already registered to another account" }), 409 try: with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" INSERT INTO v2_v3_key_profiles ( user_id, account_id, game_version, label, serial_number_hex, access_key ) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, ( user["id"], account_id, game_version, label, serial_hex, key_secret, )) row = cur.fetchone() rewrite_v2_license_arrays(conn, user, account_id) conn.commit() except psycopg.errors.UniqueViolation: return jsonify({"error": "that key is already registered to an account"}), 409 sync = bb_sync_info(account_id) return jsonify({ "ok": True, "key": { "id": row["id"], "game_version": game_version, "game_version_label": SUPPORTED_KEY_VERSIONS[game_version]["label"], "label": label, "serial_number_hex": serial_hex, }, "sync_status": sync["status"], "account_id": account_id_str(account_id), }) @app.get("/keys//access-key") def get_key_access_key(key_id): user = current_user() if not user: return jsonify({"error": "not authenticated"}), 401 if not user.get("email_verified_at"): return jsonify({ "error": "Please verify your email before viewing game account keys.", "email_required": True, }), 403 ensure_key_profile_table() with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, game_version, serial_number_hex, access_key FROM v2_v3_key_profiles WHERE id = %s AND user_id = %s LIMIT 1 """, (key_id, user["id"])) row = cur.fetchone() if not row: return jsonify({"error": "key profile not found"}), 404 return jsonify({ "ok": True, "key": { "id": row["id"], "game_version": row["game_version"], "serial_number_hex": row["serial_number_hex"], "access_key": row["access_key"], }, }) @app.delete("/keys/") def delete_key_profile(key_id): user = current_user() if not user: return jsonify({"error": "not authenticated"}), 401 if not user.get("email_verified_at"): return jsonify({ "error": "Please verify your email before changing game account settings.", "email_required": True, }), 403 ensure_key_profile_table() account_id = site_account_id(user["username"]) with connect() as conn: with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute(""" SELECT id, game_version, label, serial_number_hex FROM v2_v3_key_profiles WHERE id = %s AND user_id = %s LIMIT 1 """, (key_id, user["id"])) row = cur.fetchone() if not row: return jsonify({"error": "key profile not found"}), 404 cur.execute(""" DELETE FROM v2_v3_key_profiles WHERE id = %s AND user_id = %s """, (key_id, user["id"])) rewrite_v2_license_arrays(conn, user, account_id) conn.commit() sync = bb_sync_info(account_id) return jsonify({ "ok": True, "deleted": { "id": row["id"], "game_version": row["game_version"], "label": row["label"], "serial_number_hex": row["serial_number_hex"], }, "sync_status": sync["status"], "account_id": account_id_str(account_id), })