diff --git a/backend/key_routes.py.before-gc-v3-smtp-fix-20260611T001916Z b/backend/key_routes.py.before-gc-v3-smtp-fix-20260611T001916Z deleted file mode 100644 index 8df5300..0000000 --- a/backend/key_routes.py.before-gc-v3-smtp-fix-20260611T001916Z +++ /dev/null @@ -1,514 +0,0 @@ -import json -import re -import os -from pathlib import Path - -import psycopg - - -SUPPORTED_KEY_VERSIONS = { - "dc_v2": {"label": "DC V2"}, - "pc_v2": {"label": "PC V2"}, - "gc_v3": {"label": "GC V3"}, -} - - -def register_key_routes( - app, - *, - connect, - current_user, - jsonify, - request, - account_id_str, - bb_account_id, - canonical_license_path, - refresh_account_manifest, - enqueue_account_sync, - bb_sync_info, -): - def ensure_key_profile_table(): - with connect() as conn: - with conn.cursor() as cur: - cur.execute(""" - CREATE TABLE IF NOT EXISTS v2_v3_key_profiles ( - id BIGSERIAL PRIMARY KEY, - user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE, - account_id BIGINT NOT NULL, - game_version TEXT NOT NULL, - label TEXT NOT NULL DEFAULT '', - serial_number_hex TEXT NOT NULL, - access_key TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - UNIQUE (game_version, serial_number_hex) - ) - """) - conn.commit() - - def site_account_id(username): - return bb_account_id(username) - - def user_has_bb_account(conn, user_id): - with conn.cursor() as cur: - cur.execute( - "SELECT 1 FROM bb_accounts WHERE user_id = %s LIMIT 1", - (user_id,), - ) - return cur.fetchone() is not None - - def normalize_serial_hex(value, game_version): - raw = str(value or "").strip() - - if not raw: - raise ValueError("serial number is required") - - if game_version == "dc_v2": - v = raw.upper() - if v.startswith("0X"): - v = v[2:] - if not re.fullmatch(r"[0-9A-F]{1,8}", v): - raise ValueError("DC V2 serial must be hex, like 4E62F237") - return f"{int(v, 16):08X}" - - if game_version == "pc_v2": - if not re.fullmatch(r"[0-9]{1,10}", raw): - raise ValueError("PC V2 serial must be digits only") - n = int(raw, 10) - if not (0 <= n <= 0xFFFFFFFF): - raise ValueError("PC V2 serial is out of range") - return f"{n:08X}" - - if game_version == "gc_v3": - if not re.fullmatch(r"[0-9]{2}-[0-9]{4}-[0-9]{4}", raw): - raise ValueError("GC V3 serial must use the dashed format, like 21-3364-4991") - n = int(raw.replace("-", ""), 10) - if not (0 <= n <= 0xFFFFFFFF): - raise ValueError("GC V3 serial is out of range") - return f"{n:08X}" - - raise ValueError("unsupported game version") - - def validate_secret(value): - v = str(value or "").strip() - if not v: - raise ValueError("key is required") - if len(v) > 64: - raise ValueError("key is too long") - if any(ord(ch) < 0x20 for ch in v): - raise ValueError("key contains invalid characters") - return v - - def phosg_string(value): - return json.dumps(str(value)) - - def base_site_license_text(account_id, username): - aid_hex = f"0x{int(account_id):X}" - uname = phosg_string(username) - return ( - "{\n" - " \"BBTeamID\": 0x0,\n" - " \"FormatVersion\": 0x1,\n" - f" \"AccountID\": {aid_hex},\n" - f" \"LastPlayerName\": {uname},\n" - " \"DCNTELicenses\": [],\n" - " \"BBLicenses\": [],\n" - " \"BanEndTime\": 0x0,\n" - " \"PCLicenses\": [],\n" - " \"AutoReplyMessage\": \"\",\n" - " \"GCLicenses\": [],\n" - " \"AutoPatchesEnabled\": [\"PsoPeepsV2EXP_enabled\", \"RareDropNotifications\", \"UltimateMapFix\", \"RaresInQuests\", \"DisableIdleDisconnect\", \"ItemLossPrevention\"],\n" - " \"XBLicenses\": [],\n" - " \"Flags\": 0x0,\n" - " \"Ep3TotalMesetaEarned\": 0x0,\n" - " \"Ep3CurrentMeseta\": 0x0,\n" - " \"DCLicenses\": [],\n" - " \"UserFlags\": 0x0\n" - "}" - ) - - def find_array_span(text, array_name): - marker = f'"{array_name}":' - marker_pos = text.find(marker) - if marker_pos < 0: - return None - - bracket_pos = text.find("[", marker_pos) - if bracket_pos < 0: - return None - - depth = 0 - in_string = False - escape = False - - for i in range(bracket_pos, len(text)): - ch = text[i] - - if in_string: - if escape: - escape = False - elif ch == "\\": - escape = True - elif ch == '"': - in_string = False - else: - if ch == '"': - in_string = True - elif ch == "[": - depth += 1 - elif ch == "]": - depth -= 1 - if depth == 0: - return bracket_pos, i + 1 - - return None - - def replace_license_array(text, array_name, entries): - span = find_array_span(text, array_name) - if span is None: - raise RuntimeError(f"could not find {array_name} in license file") - - start, end = span - rendered = "[]" if not entries else "[\n" + ",\n".join(f" {e}" for e in entries) + "\n ]" - return text[:start] + rendered + text[end:] - - def render_v2_entry(row): - serial_hex = row["serial_number_hex"].upper() - secret = phosg_string(row["access_key"]) - return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}}}' - - - def rewrite_v2_license_arrays(conn, user, account_id): - path = canonical_license_path(account_id) - path.parent.mkdir(parents=True, exist_ok=True) - - text = path.read_text() if path.exists() else base_site_license_text(account_id, user["username"]) - - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - SELECT game_version, serial_number_hex, access_key - FROM v2_v3_key_profiles - WHERE user_id = %s - ORDER BY id - """, (user["id"],)) - rows = list(cur.fetchall()) - - dc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "dc_v2"] - pc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "pc_v2"] - gc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "gc_v3"] - - text = replace_license_array(text, "DCLicenses", dc_entries) - text = replace_license_array(text, "PCLicenses", pc_entries) - text = replace_license_array(text, "GCLicenses", gc_entries) - - tmp = path.with_suffix(path.suffix + ".tmp") - tmp.write_text(text + "\n") - tmp.replace(path) - - sync_root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) - flat_path = sync_root / "canonical-system" / "system" / "licenses" / path.name - flat_path.parent.mkdir(parents=True, exist_ok=True) - flat_tmp = flat_path.with_suffix(flat_path.suffix + ".tmp") - flat_tmp.write_text(text + "\n") - flat_tmp.replace(flat_path) - - refresh_account_manifest(account_id) - enqueue_account_sync(account_id, "v2_key_profile_updated") - return path - - - @app.get("/keys") - def list_key_profiles(): - user = current_user() - if not user: - return jsonify({"authenticated": False}), 401 - - ensure_key_profile_table() - account_id = site_account_id(user["username"]) - sync = bb_sync_info(account_id) - - with connect() as conn: - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - SELECT id, game_version, label, serial_number_hex, created_at, updated_at - FROM v2_v3_key_profiles - WHERE user_id = %s - ORDER BY id - """, (user["id"],)) - rows = list(cur.fetchall()) - - return jsonify({ - "authenticated": True, - "account_id": account_id_str(account_id), - "sync_status": sync["status"], - "regions": sync["regions"], - "keys": [{ - "id": row["id"], - "game_version": row["game_version"], - "game_version_label": SUPPORTED_KEY_VERSIONS.get(row["game_version"], {}).get("label", row["game_version"]), - "label": row["label"], - "serial_number_hex": row["serial_number_hex"], - "created_at": row["created_at"].isoformat() if row["created_at"] else None, - "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, - } for row in rows], - }) - - - def license_text_has_serial(text, serial_hex): - want_hex = (serial_hex or "").upper().lstrip("0") or "0" - - for match in re.finditer(r'"SerialNumber"\s*:\s*0x([0-9A-Fa-f]+)', text): - got_hex = match.group(1).upper().lstrip("0") or "0" - if got_hex == want_hex: - return True - - try: - want_dec = str(int(serial_hex, 16)) - except ValueError: - want_dec = None - - if want_dec is not None: - for match in re.finditer(r'"SerialNumber"\s*:\s*([0-9]+)', text): - if match.group(1) == want_dec: - return True - - return False - - - def find_existing_key_owner(serial_hex, current_account_id): - root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync")) - - current_ids = { - account_id_str(current_account_id), - str(int(current_account_id)), - } - - def owner_for_license_path(license_path): - try: - rel = license_path.relative_to(root) - parts = rel.parts - except Exception: - return license_path.stem - - # canonical/accounts/ACCOUNT/system/licenses/ACCOUNT.json - # canonical/REGION/accounts/ACCOUNT/system/licenses/ACCOUNT.json - # inbox/REGION/SOURCE/ACCOUNT/system/licenses/ACCOUNT.json - for i in range(1, len(parts) - 2): - if parts[i:i + 2] == ("system", "licenses"): - possible = parts[i - 1] - if re.fullmatch(r"[0-9]{10}", possible): - return possible - - # canonical-system/system/licenses/ACCOUNT.json - return license_path.stem - - search_paths = [] - seen_paths = set() - - for base in ( - root / "canonical", - root / "canonical-system", - root / "inbox", - ): - if not base.is_dir(): - continue - - for license_path in sorted(base.rglob("system/licenses/*.json")): - if license_path in seen_paths: - continue - seen_paths.add(license_path) - search_paths.append((license_path, owner_for_license_path(license_path))) - - for license_path, owner_account_id in search_paths: - if owner_account_id in current_ids: - continue - - try: - text = license_path.read_text(errors="ignore") - except OSError: - continue - - if license_text_has_serial(text, serial_hex): - return { - "account_id": owner_account_id, - "path": str(license_path), - } - - return None - - - @app.post("/keys/register") - def register_key_profile(): - user = current_user() - if not user: - return jsonify({"error": "not authenticated"}), 401 - - if not user.get("email_verified_at"): - return jsonify({ - "error": "Please verify your email before changing game account settings.", - "email_required": True, - }), 403 - - ensure_key_profile_table() - - with connect() as conn: - if not user_has_bb_account(conn, user["id"]): - return jsonify({ - "error": "Create your Blue Burst account before adding DC V2, PC V2, or GC V3 keys.", - "bb_account_required": True, - }), 409 - - data = request.get_json(silent=True) or {} - game_version = str(data.get("game_version") or "").strip().lower() - label = str(data.get("label") or "").strip()[:80] - - if game_version not in SUPPORTED_KEY_VERSIONS: - return jsonify({"error": "unsupported game version"}), 400 - - try: - serial_hex = normalize_serial_hex(data.get("serial_number"), game_version) - key_secret = validate_secret(data.get("access_key")) - except ValueError as e: - return jsonify({"error": str(e)}), 400 - - account_id = site_account_id(user["username"]) - - duplicate = find_existing_key_owner(serial_hex, account_id) - if duplicate: - return jsonify({ - "error": "that key is already registered to another account" - }), 409 - - try: - with connect() as conn: - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - INSERT INTO v2_v3_key_profiles ( - user_id, - account_id, - game_version, - label, - serial_number_hex, - access_key - ) VALUES (%s, %s, %s, %s, %s, %s) - RETURNING id - """, ( - user["id"], - account_id, - game_version, - label, - serial_hex, - key_secret, - )) - row = cur.fetchone() - - rewrite_v2_license_arrays(conn, user, account_id) - conn.commit() - - except psycopg.errors.UniqueViolation: - return jsonify({"error": "that key is already registered to an account"}), 409 - - sync = bb_sync_info(account_id) - - return jsonify({ - "ok": True, - "key": { - "id": row["id"], - "game_version": game_version, - "game_version_label": SUPPORTED_KEY_VERSIONS[game_version]["label"], - "label": label, - "serial_number_hex": serial_hex, - }, - "sync_status": sync["status"], - "account_id": account_id_str(account_id), - }) - - - @app.get("/keys//access-key") - def get_key_access_key(key_id): - user = current_user() - if not user: - return jsonify({"error": "not authenticated"}), 401 - - if not user.get("email_verified_at"): - return jsonify({ - "error": "Please verify your email before viewing game account keys.", - "email_required": True, - }), 403 - - ensure_key_profile_table() - - with connect() as conn: - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - SELECT id, game_version, serial_number_hex, access_key - FROM v2_v3_key_profiles - WHERE id = %s AND user_id = %s - LIMIT 1 - """, (key_id, user["id"])) - row = cur.fetchone() - - if not row: - return jsonify({"error": "key profile not found"}), 404 - - return jsonify({ - "ok": True, - "key": { - "id": row["id"], - "game_version": row["game_version"], - "serial_number_hex": row["serial_number_hex"], - "access_key": row["access_key"], - }, - }) - - - @app.delete("/keys/") - def delete_key_profile(key_id): - user = current_user() - if not user: - return jsonify({"error": "not authenticated"}), 401 - - if not user.get("email_verified_at"): - return jsonify({ - "error": "Please verify your email before changing game account settings.", - "email_required": True, - }), 403 - - ensure_key_profile_table() - account_id = site_account_id(user["username"]) - - with connect() as conn: - with conn.cursor(row_factory=psycopg.rows.dict_row) as cur: - cur.execute(""" - SELECT id, game_version, label, serial_number_hex - FROM v2_v3_key_profiles - WHERE id = %s AND user_id = %s - LIMIT 1 - """, (key_id, user["id"])) - row = cur.fetchone() - - if not row: - return jsonify({"error": "key profile not found"}), 404 - - cur.execute(""" - DELETE FROM v2_v3_key_profiles - WHERE id = %s AND user_id = %s - """, (key_id, user["id"])) - - rewrite_v2_license_arrays(conn, user, account_id) - conn.commit() - - sync = bb_sync_info(account_id) - - return jsonify({ - "ok": True, - "deleted": { - "id": row["id"], - "game_version": row["game_version"], - "label": row["label"], - "serial_number_hex": row["serial_number_hex"], - }, - "sync_status": sync["status"], - "account_id": account_id_str(account_id), - }) -