Files

541 lines
19 KiB
Python

import json
import re
import os
from pathlib import Path
import psycopg
SUPPORTED_KEY_VERSIONS = {
"dc_v2": {"label": "DC V2"},
"pc_v2": {"label": "PC V2"},
"gc_v3": {"label": "GC V3"},
}
def register_key_routes(
app,
*,
connect,
current_user,
jsonify,
request,
account_id_str,
bb_account_id,
canonical_license_path,
refresh_account_manifest,
enqueue_account_sync,
bb_sync_info,
):
def ensure_key_profile_table():
with connect() as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS v2_v3_key_profiles (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE,
account_id BIGINT NOT NULL,
game_version TEXT NOT NULL,
label TEXT NOT NULL DEFAULT '',
serial_number_hex TEXT NOT NULL,
access_key TEXT NOT NULL,
gc_password TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (game_version, serial_number_hex)
)
""")
cur.execute("""
ALTER TABLE v2_v3_key_profiles
ADD COLUMN IF NOT EXISTS gc_password TEXT
""")
conn.commit()
def site_account_id(username):
return bb_account_id(username)
def user_has_bb_account(conn, user_id):
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM bb_accounts WHERE user_id = %s LIMIT 1",
(user_id,),
)
return cur.fetchone() is not None
def normalize_serial_hex(value, game_version):
raw = str(value or "").strip()
if not raw:
raise ValueError("serial number is required")
if game_version == "dc_v2":
v = raw.upper()
if v.startswith("0X"):
v = v[2:]
if not re.fullmatch(r"[0-9A-F]{1,8}", v):
raise ValueError("DC V2 serial must be hex, like 4E62F237")
return f"{int(v, 16):08X}"
if game_version == "pc_v2":
if not re.fullmatch(r"[0-9]{1,10}", raw):
raise ValueError("PC V2 serial must be digits only")
n = int(raw, 10)
if not (0 <= n <= 0xFFFFFFFF):
raise ValueError("PC V2 serial is out of range")
return f"{n:08X}"
if game_version == "gc_v3":
if not re.fullmatch(r"[0-9]{2}-[0-9]{4}-[0-9]{4}", raw):
raise ValueError("GC V3 serial must use the dashed format: NN-NNNN-NNNN.")
n = int(raw.replace("-", ""), 10)
if not (0 <= n <= 0xFFFFFFFF):
raise ValueError("GC V3 serial is out of range")
return f"{n:08X}"
raise ValueError("unsupported game version")
def validate_secret(value):
v = str(value or "").strip()
if not v:
raise ValueError("key is required")
if len(v) > 64:
raise ValueError("key is too long")
if any(ord(ch) < 0x20 for ch in v):
raise ValueError("key contains invalid characters")
return v
def phosg_string(value):
return json.dumps(str(value))
def base_site_license_text(account_id, username):
aid_hex = f"0x{int(account_id):X}"
uname = phosg_string(username)
return (
"{\n"
" \"BBTeamID\": 0x0,\n"
" \"FormatVersion\": 0x1,\n"
f" \"AccountID\": {aid_hex},\n"
f" \"LastPlayerName\": {uname},\n"
" \"DCNTELicenses\": [],\n"
" \"BBLicenses\": [],\n"
" \"BanEndTime\": 0x0,\n"
" \"PCLicenses\": [],\n"
" \"AutoReplyMessage\": \"\",\n"
" \"GCLicenses\": [],\n"
" \"AutoPatchesEnabled\": [\"PsoPeepsV2EXP_enabled\", \"RareDropNotifications\", \"UltimateMapFix\", \"RaresInQuests\", \"DisableIdleDisconnect\", \"ItemLossPrevention\"],\n"
" \"XBLicenses\": [],\n"
" \"Flags\": 0x0,\n"
" \"Ep3TotalMesetaEarned\": 0x0,\n"
" \"Ep3CurrentMeseta\": 0x0,\n"
" \"DCLicenses\": [],\n"
" \"UserFlags\": 0x0\n"
"}"
)
def find_array_span(text, array_name):
marker = f'"{array_name}":'
marker_pos = text.find(marker)
if marker_pos < 0:
return None
bracket_pos = text.find("[", marker_pos)
if bracket_pos < 0:
return None
depth = 0
in_string = False
escape = False
for i in range(bracket_pos, len(text)):
ch = text[i]
if in_string:
if escape:
escape = False
elif ch == "\\":
escape = True
elif ch == '"':
in_string = False
else:
if ch == '"':
in_string = True
elif ch == "[":
depth += 1
elif ch == "]":
depth -= 1
if depth == 0:
return bracket_pos, i + 1
return None
def replace_license_array(text, array_name, entries):
span = find_array_span(text, array_name)
if span is None:
raise RuntimeError(f"could not find {array_name} in license file")
start, end = span
rendered = "[]" if not entries else "[\n" + ",\n".join(f" {e}" for e in entries) + "\n ]"
return text[:start] + rendered + text[end:]
def render_v2_entry(row):
serial_hex = row["serial_number_hex"].upper()
secret = phosg_string(row["access_key"])
if row["game_version"] == "gc_v3":
gc_password = row.get("gc_password")
if not gc_password:
raise RuntimeError("GC V3 key profile is missing its GC password")
password = phosg_string(gc_password)
return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}, "Password": {password}}}'
return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}}}'
def rewrite_v2_license_arrays(conn, user, account_id):
path = canonical_license_path(account_id)
path.parent.mkdir(parents=True, exist_ok=True)
text = path.read_text() if path.exists() else base_site_license_text(account_id, user["username"])
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT game_version, serial_number_hex, access_key, gc_password
FROM v2_v3_key_profiles
WHERE user_id = %s
ORDER BY id
""", (user["id"],))
rows = list(cur.fetchall())
dc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "dc_v2"]
pc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "pc_v2"]
gc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "gc_v3"]
text = replace_license_array(text, "DCLicenses", dc_entries)
text = replace_license_array(text, "PCLicenses", pc_entries)
text = replace_license_array(text, "GCLicenses", gc_entries)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(text + "\n")
tmp.replace(path)
sync_root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync"))
flat_path = sync_root / "canonical-system" / "system" / "licenses" / path.name
flat_path.parent.mkdir(parents=True, exist_ok=True)
flat_tmp = flat_path.with_suffix(flat_path.suffix + ".tmp")
flat_tmp.write_text(text + "\n")
flat_tmp.replace(flat_path)
refresh_account_manifest(account_id)
enqueue_account_sync(account_id, "v2_key_profile_updated")
return path
@app.get("/keys")
def list_key_profiles():
user = current_user()
if not user:
return jsonify({"authenticated": False}), 401
ensure_key_profile_table()
account_id = site_account_id(user["username"])
sync = bb_sync_info(account_id)
with connect() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT id, game_version, label, serial_number_hex, created_at, updated_at
FROM v2_v3_key_profiles
WHERE user_id = %s
ORDER BY id
""", (user["id"],))
rows = list(cur.fetchall())
return jsonify({
"authenticated": True,
"account_id": account_id_str(account_id),
"sync_status": sync["status"],
"regions": sync["regions"],
"keys": [{
"id": row["id"],
"game_version": row["game_version"],
"game_version_label": SUPPORTED_KEY_VERSIONS.get(row["game_version"], {}).get("label", row["game_version"]),
"label": row["label"],
"serial_number_hex": row["serial_number_hex"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
} for row in rows],
})
def license_text_has_serial(text, serial_hex):
want_hex = (serial_hex or "").upper().lstrip("0") or "0"
for match in re.finditer(r'"SerialNumber"\s*:\s*0x([0-9A-Fa-f]+)', text):
got_hex = match.group(1).upper().lstrip("0") or "0"
if got_hex == want_hex:
return True
try:
want_dec = str(int(serial_hex, 16))
except ValueError:
want_dec = None
if want_dec is not None:
for match in re.finditer(r'"SerialNumber"\s*:\s*([0-9]+)', text):
if match.group(1) == want_dec:
return True
return False
def find_existing_key_owner(serial_hex, current_account_id):
root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync"))
current_ids = {
account_id_str(current_account_id),
str(int(current_account_id)),
}
def owner_for_license_path(license_path):
try:
rel = license_path.relative_to(root)
parts = rel.parts
except Exception:
return license_path.stem
# canonical/accounts/ACCOUNT/system/licenses/ACCOUNT.json
# canonical/REGION/accounts/ACCOUNT/system/licenses/ACCOUNT.json
# inbox/REGION/SOURCE/ACCOUNT/system/licenses/ACCOUNT.json
for i in range(1, len(parts) - 2):
if parts[i:i + 2] == ("system", "licenses"):
possible = parts[i - 1]
if re.fullmatch(r"[0-9]{10}", possible):
return possible
# canonical-system/system/licenses/ACCOUNT.json
return license_path.stem
search_paths = []
seen_paths = set()
for base in (
root / "canonical",
root / "canonical-system",
root / "inbox",
):
if not base.is_dir():
continue
for license_path in sorted(base.rglob("system/licenses/*.json")):
if license_path in seen_paths:
continue
seen_paths.add(license_path)
search_paths.append((license_path, owner_for_license_path(license_path)))
for license_path, owner_account_id in search_paths:
if owner_account_id in current_ids:
continue
try:
text = license_path.read_text(errors="ignore")
except OSError:
continue
if license_text_has_serial(text, serial_hex):
return {
"account_id": owner_account_id,
"path": str(license_path),
}
return None
def normalize_access_key(value, game_version):
raw = validate_secret(value)
if game_version == "gc_v3":
normalized = raw.replace("-", "").replace(" ", "")
if not normalized.isdigit() or len(normalized) != 12:
raise ValueError("GC V3 access key must be 12 digits.")
return normalized
return raw
@app.post("/keys/register")
def register_key_profile():
user = current_user()
if not user:
return jsonify({"error": "not authenticated"}), 401
if not user.get("email_verified_at"):
return jsonify({
"error": "Please verify your email before changing game account settings.",
"email_required": True,
}), 403
ensure_key_profile_table()
with connect() as conn:
if not user_has_bb_account(conn, user["id"]):
return jsonify({
"error": "Create your Blue Burst account before adding DC V2, PC V2, or GC V3 keys.",
"bb_account_required": True,
}), 409
data = request.get_json(silent=True) or {}
game_version = str(data.get("game_version") or "").strip().lower()
label = str(data.get("label") or "").strip()[:80]
if game_version not in SUPPORTED_KEY_VERSIONS:
return jsonify({"error": "unsupported game version"}), 400
try:
serial_hex = normalize_serial_hex(data.get("serial_number"), game_version)
key_secret = normalize_access_key(data.get("access_key"), game_version)
gc_password = validate_secret(data.get("password")) if game_version == "gc_v3" else None
except ValueError as e:
return jsonify({"error": str(e)}), 400
account_id = site_account_id(user["username"])
duplicate = find_existing_key_owner(serial_hex, account_id)
if duplicate:
return jsonify({
"error": "that key is already registered to another account"
}), 409
try:
with connect() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
INSERT INTO v2_v3_key_profiles (
user_id,
account_id,
game_version,
label,
serial_number_hex,
access_key,
gc_password
) VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
user["id"],
account_id,
game_version,
label,
serial_hex,
key_secret,
gc_password,
))
row = cur.fetchone()
rewrite_v2_license_arrays(conn, user, account_id)
conn.commit()
except psycopg.errors.UniqueViolation:
return jsonify({"error": "that key is already registered to an account"}), 409
sync = bb_sync_info(account_id)
return jsonify({
"ok": True,
"key": {
"id": row["id"],
"game_version": game_version,
"game_version_label": SUPPORTED_KEY_VERSIONS[game_version]["label"],
"label": label,
"serial_number_hex": serial_hex,
},
"sync_status": sync["status"],
"account_id": account_id_str(account_id),
})
@app.get("/keys/<int:key_id>/access-key")
def get_key_access_key(key_id):
user = current_user()
if not user:
return jsonify({"error": "not authenticated"}), 401
if not user.get("email_verified_at"):
return jsonify({
"error": "Please verify your email before viewing game account keys.",
"email_required": True,
}), 403
ensure_key_profile_table()
with connect() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT id, game_version, serial_number_hex, access_key
FROM v2_v3_key_profiles
WHERE id = %s AND user_id = %s
LIMIT 1
""", (key_id, user["id"]))
row = cur.fetchone()
if not row:
return jsonify({"error": "key profile not found"}), 404
return jsonify({
"ok": True,
"key": {
"id": row["id"],
"game_version": row["game_version"],
"serial_number_hex": row["serial_number_hex"],
"access_key": row["access_key"],
},
})
@app.delete("/keys/<int:key_id>")
def delete_key_profile(key_id):
user = current_user()
if not user:
return jsonify({"error": "not authenticated"}), 401
if not user.get("email_verified_at"):
return jsonify({
"error": "Please verify your email before changing game account settings.",
"email_required": True,
}), 403
ensure_key_profile_table()
account_id = site_account_id(user["username"])
with connect() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute("""
SELECT id, game_version, label, serial_number_hex
FROM v2_v3_key_profiles
WHERE id = %s AND user_id = %s
LIMIT 1
""", (key_id, user["id"]))
row = cur.fetchone()
if not row:
return jsonify({"error": "key profile not found"}), 404
cur.execute("""
DELETE FROM v2_v3_key_profiles
WHERE id = %s AND user_id = %s
""", (key_id, user["id"]))
rewrite_v2_license_arrays(conn, user, account_id)
conn.commit()
sync = bb_sync_info(account_id)
return jsonify({
"ok": True,
"deleted": {
"id": row["id"],
"game_version": row["game_version"],
"label": row["label"],
"serial_number_hex": row["serial_number_hex"],
},
"sync_status": sync["status"],
"account_id": account_id_str(account_id),
})