Delete backend/key_routes.py.before-gc-v3-accesskey-normalize-20260611T014008Z
This commit is contained in:
@@ -1,530 +0,0 @@
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg
|
||||
|
||||
|
||||
SUPPORTED_KEY_VERSIONS = {
|
||||
"dc_v2": {"label": "DC V2"},
|
||||
"pc_v2": {"label": "PC V2"},
|
||||
"gc_v3": {"label": "GC V3"},
|
||||
}
|
||||
|
||||
|
||||
def register_key_routes(
|
||||
app,
|
||||
*,
|
||||
connect,
|
||||
current_user,
|
||||
jsonify,
|
||||
request,
|
||||
account_id_str,
|
||||
bb_account_id,
|
||||
canonical_license_path,
|
||||
refresh_account_manifest,
|
||||
enqueue_account_sync,
|
||||
bb_sync_info,
|
||||
):
|
||||
def ensure_key_profile_table():
|
||||
with connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS v2_v3_key_profiles (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
user_id BIGINT NOT NULL REFERENCES site_users(id) ON DELETE CASCADE,
|
||||
account_id BIGINT NOT NULL,
|
||||
game_version TEXT NOT NULL,
|
||||
label TEXT NOT NULL DEFAULT '',
|
||||
serial_number_hex TEXT NOT NULL,
|
||||
access_key TEXT NOT NULL,
|
||||
gc_password TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (game_version, serial_number_hex)
|
||||
)
|
||||
""")
|
||||
cur.execute("""
|
||||
ALTER TABLE v2_v3_key_profiles
|
||||
ADD COLUMN IF NOT EXISTS gc_password TEXT
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
def site_account_id(username):
|
||||
return bb_account_id(username)
|
||||
|
||||
def user_has_bb_account(conn, user_id):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT 1 FROM bb_accounts WHERE user_id = %s LIMIT 1",
|
||||
(user_id,),
|
||||
)
|
||||
return cur.fetchone() is not None
|
||||
|
||||
def normalize_serial_hex(value, game_version):
|
||||
raw = str(value or "").strip()
|
||||
|
||||
if not raw:
|
||||
raise ValueError("serial number is required")
|
||||
|
||||
if game_version == "dc_v2":
|
||||
v = raw.upper()
|
||||
if v.startswith("0X"):
|
||||
v = v[2:]
|
||||
if not re.fullmatch(r"[0-9A-F]{1,8}", v):
|
||||
raise ValueError("DC V2 serial must be hex, like 4E62F237")
|
||||
return f"{int(v, 16):08X}"
|
||||
|
||||
if game_version == "pc_v2":
|
||||
if not re.fullmatch(r"[0-9]{1,10}", raw):
|
||||
raise ValueError("PC V2 serial must be digits only")
|
||||
n = int(raw, 10)
|
||||
if not (0 <= n <= 0xFFFFFFFF):
|
||||
raise ValueError("PC V2 serial is out of range")
|
||||
return f"{n:08X}"
|
||||
|
||||
if game_version == "gc_v3":
|
||||
if not re.fullmatch(r"[0-9]{2}-[0-9]{4}-[0-9]{4}", raw):
|
||||
raise ValueError("GC V3 serial must use the dashed format: NN-NNNN-NNNN.")
|
||||
n = int(raw.replace("-", ""), 10)
|
||||
if not (0 <= n <= 0xFFFFFFFF):
|
||||
raise ValueError("GC V3 serial is out of range")
|
||||
return f"{n:08X}"
|
||||
|
||||
raise ValueError("unsupported game version")
|
||||
|
||||
def validate_secret(value):
|
||||
v = str(value or "").strip()
|
||||
if not v:
|
||||
raise ValueError("key is required")
|
||||
if len(v) > 64:
|
||||
raise ValueError("key is too long")
|
||||
if any(ord(ch) < 0x20 for ch in v):
|
||||
raise ValueError("key contains invalid characters")
|
||||
return v
|
||||
|
||||
def phosg_string(value):
|
||||
return json.dumps(str(value))
|
||||
|
||||
def base_site_license_text(account_id, username):
|
||||
aid_hex = f"0x{int(account_id):X}"
|
||||
uname = phosg_string(username)
|
||||
return (
|
||||
"{\n"
|
||||
" \"BBTeamID\": 0x0,\n"
|
||||
" \"FormatVersion\": 0x1,\n"
|
||||
f" \"AccountID\": {aid_hex},\n"
|
||||
f" \"LastPlayerName\": {uname},\n"
|
||||
" \"DCNTELicenses\": [],\n"
|
||||
" \"BBLicenses\": [],\n"
|
||||
" \"BanEndTime\": 0x0,\n"
|
||||
" \"PCLicenses\": [],\n"
|
||||
" \"AutoReplyMessage\": \"\",\n"
|
||||
" \"GCLicenses\": [],\n"
|
||||
" \"AutoPatchesEnabled\": [\"PsoPeepsV2EXP_enabled\", \"RareDropNotifications\", \"UltimateMapFix\", \"RaresInQuests\", \"DisableIdleDisconnect\", \"ItemLossPrevention\"],\n"
|
||||
" \"XBLicenses\": [],\n"
|
||||
" \"Flags\": 0x0,\n"
|
||||
" \"Ep3TotalMesetaEarned\": 0x0,\n"
|
||||
" \"Ep3CurrentMeseta\": 0x0,\n"
|
||||
" \"DCLicenses\": [],\n"
|
||||
" \"UserFlags\": 0x0\n"
|
||||
"}"
|
||||
)
|
||||
|
||||
def find_array_span(text, array_name):
|
||||
marker = f'"{array_name}":'
|
||||
marker_pos = text.find(marker)
|
||||
if marker_pos < 0:
|
||||
return None
|
||||
|
||||
bracket_pos = text.find("[", marker_pos)
|
||||
if bracket_pos < 0:
|
||||
return None
|
||||
|
||||
depth = 0
|
||||
in_string = False
|
||||
escape = False
|
||||
|
||||
for i in range(bracket_pos, len(text)):
|
||||
ch = text[i]
|
||||
|
||||
if in_string:
|
||||
if escape:
|
||||
escape = False
|
||||
elif ch == "\\":
|
||||
escape = True
|
||||
elif ch == '"':
|
||||
in_string = False
|
||||
else:
|
||||
if ch == '"':
|
||||
in_string = True
|
||||
elif ch == "[":
|
||||
depth += 1
|
||||
elif ch == "]":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
return bracket_pos, i + 1
|
||||
|
||||
return None
|
||||
|
||||
def replace_license_array(text, array_name, entries):
|
||||
span = find_array_span(text, array_name)
|
||||
if span is None:
|
||||
raise RuntimeError(f"could not find {array_name} in license file")
|
||||
|
||||
start, end = span
|
||||
rendered = "[]" if not entries else "[\n" + ",\n".join(f" {e}" for e in entries) + "\n ]"
|
||||
return text[:start] + rendered + text[end:]
|
||||
|
||||
def render_v2_entry(row):
|
||||
serial_hex = row["serial_number_hex"].upper()
|
||||
secret = phosg_string(row["access_key"])
|
||||
|
||||
if row["game_version"] == "gc_v3":
|
||||
gc_password = row.get("gc_password")
|
||||
if not gc_password:
|
||||
raise RuntimeError("GC V3 key profile is missing its GC password")
|
||||
password = phosg_string(gc_password)
|
||||
return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}, "Password": {password}}}'
|
||||
|
||||
return f'{{"SerialNumber": 0x{serial_hex}, "AccessKey": {secret}}}'
|
||||
|
||||
|
||||
def rewrite_v2_license_arrays(conn, user, account_id):
|
||||
path = canonical_license_path(account_id)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
text = path.read_text() if path.exists() else base_site_license_text(account_id, user["username"])
|
||||
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT game_version, serial_number_hex, access_key, gc_password
|
||||
FROM v2_v3_key_profiles
|
||||
WHERE user_id = %s
|
||||
ORDER BY id
|
||||
""", (user["id"],))
|
||||
rows = list(cur.fetchall())
|
||||
|
||||
dc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "dc_v2"]
|
||||
pc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "pc_v2"]
|
||||
gc_entries = [render_v2_entry(r) for r in rows if r["game_version"] == "gc_v3"]
|
||||
|
||||
text = replace_license_array(text, "DCLicenses", dc_entries)
|
||||
text = replace_license_array(text, "PCLicenses", pc_entries)
|
||||
text = replace_license_array(text, "GCLicenses", gc_entries)
|
||||
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
tmp.write_text(text + "\n")
|
||||
tmp.replace(path)
|
||||
|
||||
sync_root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync"))
|
||||
flat_path = sync_root / "canonical-system" / "system" / "licenses" / path.name
|
||||
flat_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
flat_tmp = flat_path.with_suffix(flat_path.suffix + ".tmp")
|
||||
flat_tmp.write_text(text + "\n")
|
||||
flat_tmp.replace(flat_path)
|
||||
|
||||
refresh_account_manifest(account_id)
|
||||
enqueue_account_sync(account_id, "v2_key_profile_updated")
|
||||
return path
|
||||
|
||||
|
||||
@app.get("/keys")
|
||||
def list_key_profiles():
|
||||
user = current_user()
|
||||
if not user:
|
||||
return jsonify({"authenticated": False}), 401
|
||||
|
||||
ensure_key_profile_table()
|
||||
account_id = site_account_id(user["username"])
|
||||
sync = bb_sync_info(account_id)
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT id, game_version, label, serial_number_hex, created_at, updated_at
|
||||
FROM v2_v3_key_profiles
|
||||
WHERE user_id = %s
|
||||
ORDER BY id
|
||||
""", (user["id"],))
|
||||
rows = list(cur.fetchall())
|
||||
|
||||
return jsonify({
|
||||
"authenticated": True,
|
||||
"account_id": account_id_str(account_id),
|
||||
"sync_status": sync["status"],
|
||||
"regions": sync["regions"],
|
||||
"keys": [{
|
||||
"id": row["id"],
|
||||
"game_version": row["game_version"],
|
||||
"game_version_label": SUPPORTED_KEY_VERSIONS.get(row["game_version"], {}).get("label", row["game_version"]),
|
||||
"label": row["label"],
|
||||
"serial_number_hex": row["serial_number_hex"],
|
||||
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
||||
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
|
||||
} for row in rows],
|
||||
})
|
||||
|
||||
|
||||
def license_text_has_serial(text, serial_hex):
|
||||
want_hex = (serial_hex or "").upper().lstrip("0") or "0"
|
||||
|
||||
for match in re.finditer(r'"SerialNumber"\s*:\s*0x([0-9A-Fa-f]+)', text):
|
||||
got_hex = match.group(1).upper().lstrip("0") or "0"
|
||||
if got_hex == want_hex:
|
||||
return True
|
||||
|
||||
try:
|
||||
want_dec = str(int(serial_hex, 16))
|
||||
except ValueError:
|
||||
want_dec = None
|
||||
|
||||
if want_dec is not None:
|
||||
for match in re.finditer(r'"SerialNumber"\s*:\s*([0-9]+)', text):
|
||||
if match.group(1) == want_dec:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def find_existing_key_owner(serial_hex, current_account_id):
|
||||
root = Path(os.environ.get("ACCOUNT_SYNC_ROOT", "/account-sync"))
|
||||
|
||||
current_ids = {
|
||||
account_id_str(current_account_id),
|
||||
str(int(current_account_id)),
|
||||
}
|
||||
|
||||
def owner_for_license_path(license_path):
|
||||
try:
|
||||
rel = license_path.relative_to(root)
|
||||
parts = rel.parts
|
||||
except Exception:
|
||||
return license_path.stem
|
||||
|
||||
# canonical/accounts/ACCOUNT/system/licenses/ACCOUNT.json
|
||||
# canonical/REGION/accounts/ACCOUNT/system/licenses/ACCOUNT.json
|
||||
# inbox/REGION/SOURCE/ACCOUNT/system/licenses/ACCOUNT.json
|
||||
for i in range(1, len(parts) - 2):
|
||||
if parts[i:i + 2] == ("system", "licenses"):
|
||||
possible = parts[i - 1]
|
||||
if re.fullmatch(r"[0-9]{10}", possible):
|
||||
return possible
|
||||
|
||||
# canonical-system/system/licenses/ACCOUNT.json
|
||||
return license_path.stem
|
||||
|
||||
search_paths = []
|
||||
seen_paths = set()
|
||||
|
||||
for base in (
|
||||
root / "canonical",
|
||||
root / "canonical-system",
|
||||
root / "inbox",
|
||||
):
|
||||
if not base.is_dir():
|
||||
continue
|
||||
|
||||
for license_path in sorted(base.rglob("system/licenses/*.json")):
|
||||
if license_path in seen_paths:
|
||||
continue
|
||||
seen_paths.add(license_path)
|
||||
search_paths.append((license_path, owner_for_license_path(license_path)))
|
||||
|
||||
for license_path, owner_account_id in search_paths:
|
||||
if owner_account_id in current_ids:
|
||||
continue
|
||||
|
||||
try:
|
||||
text = license_path.read_text(errors="ignore")
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
if license_text_has_serial(text, serial_hex):
|
||||
return {
|
||||
"account_id": owner_account_id,
|
||||
"path": str(license_path),
|
||||
}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@app.post("/keys/register")
|
||||
def register_key_profile():
|
||||
user = current_user()
|
||||
if not user:
|
||||
return jsonify({"error": "not authenticated"}), 401
|
||||
|
||||
if not user.get("email_verified_at"):
|
||||
return jsonify({
|
||||
"error": "Please verify your email before changing game account settings.",
|
||||
"email_required": True,
|
||||
}), 403
|
||||
|
||||
ensure_key_profile_table()
|
||||
|
||||
with connect() as conn:
|
||||
if not user_has_bb_account(conn, user["id"]):
|
||||
return jsonify({
|
||||
"error": "Create your Blue Burst account before adding DC V2, PC V2, or GC V3 keys.",
|
||||
"bb_account_required": True,
|
||||
}), 409
|
||||
|
||||
data = request.get_json(silent=True) or {}
|
||||
game_version = str(data.get("game_version") or "").strip().lower()
|
||||
label = str(data.get("label") or "").strip()[:80]
|
||||
|
||||
if game_version not in SUPPORTED_KEY_VERSIONS:
|
||||
return jsonify({"error": "unsupported game version"}), 400
|
||||
|
||||
try:
|
||||
serial_hex = normalize_serial_hex(data.get("serial_number"), game_version)
|
||||
key_secret = validate_secret(data.get("access_key"))
|
||||
gc_password = validate_secret(data.get("password")) if game_version == "gc_v3" else None
|
||||
except ValueError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
account_id = site_account_id(user["username"])
|
||||
|
||||
duplicate = find_existing_key_owner(serial_hex, account_id)
|
||||
if duplicate:
|
||||
return jsonify({
|
||||
"error": "that key is already registered to another account"
|
||||
}), 409
|
||||
|
||||
try:
|
||||
with connect() as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
INSERT INTO v2_v3_key_profiles (
|
||||
user_id,
|
||||
account_id,
|
||||
game_version,
|
||||
label,
|
||||
serial_number_hex,
|
||||
access_key,
|
||||
gc_password
|
||||
) VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
RETURNING id
|
||||
""", (
|
||||
user["id"],
|
||||
account_id,
|
||||
game_version,
|
||||
label,
|
||||
serial_hex,
|
||||
key_secret,
|
||||
gc_password,
|
||||
))
|
||||
row = cur.fetchone()
|
||||
|
||||
rewrite_v2_license_arrays(conn, user, account_id)
|
||||
conn.commit()
|
||||
|
||||
except psycopg.errors.UniqueViolation:
|
||||
return jsonify({"error": "that key is already registered to an account"}), 409
|
||||
|
||||
sync = bb_sync_info(account_id)
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"key": {
|
||||
"id": row["id"],
|
||||
"game_version": game_version,
|
||||
"game_version_label": SUPPORTED_KEY_VERSIONS[game_version]["label"],
|
||||
"label": label,
|
||||
"serial_number_hex": serial_hex,
|
||||
},
|
||||
"sync_status": sync["status"],
|
||||
"account_id": account_id_str(account_id),
|
||||
})
|
||||
|
||||
|
||||
@app.get("/keys/<int:key_id>/access-key")
|
||||
def get_key_access_key(key_id):
|
||||
user = current_user()
|
||||
if not user:
|
||||
return jsonify({"error": "not authenticated"}), 401
|
||||
|
||||
if not user.get("email_verified_at"):
|
||||
return jsonify({
|
||||
"error": "Please verify your email before viewing game account keys.",
|
||||
"email_required": True,
|
||||
}), 403
|
||||
|
||||
ensure_key_profile_table()
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT id, game_version, serial_number_hex, access_key
|
||||
FROM v2_v3_key_profiles
|
||||
WHERE id = %s AND user_id = %s
|
||||
LIMIT 1
|
||||
""", (key_id, user["id"]))
|
||||
row = cur.fetchone()
|
||||
|
||||
if not row:
|
||||
return jsonify({"error": "key profile not found"}), 404
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"key": {
|
||||
"id": row["id"],
|
||||
"game_version": row["game_version"],
|
||||
"serial_number_hex": row["serial_number_hex"],
|
||||
"access_key": row["access_key"],
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@app.delete("/keys/<int:key_id>")
|
||||
def delete_key_profile(key_id):
|
||||
user = current_user()
|
||||
if not user:
|
||||
return jsonify({"error": "not authenticated"}), 401
|
||||
|
||||
if not user.get("email_verified_at"):
|
||||
return jsonify({
|
||||
"error": "Please verify your email before changing game account settings.",
|
||||
"email_required": True,
|
||||
}), 403
|
||||
|
||||
ensure_key_profile_table()
|
||||
account_id = site_account_id(user["username"])
|
||||
|
||||
with connect() as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute("""
|
||||
SELECT id, game_version, label, serial_number_hex
|
||||
FROM v2_v3_key_profiles
|
||||
WHERE id = %s AND user_id = %s
|
||||
LIMIT 1
|
||||
""", (key_id, user["id"]))
|
||||
row = cur.fetchone()
|
||||
|
||||
if not row:
|
||||
return jsonify({"error": "key profile not found"}), 404
|
||||
|
||||
cur.execute("""
|
||||
DELETE FROM v2_v3_key_profiles
|
||||
WHERE id = %s AND user_id = %s
|
||||
""", (key_id, user["id"]))
|
||||
|
||||
rewrite_v2_license_arrays(conn, user, account_id)
|
||||
conn.commit()
|
||||
|
||||
sync = bb_sync_info(account_id)
|
||||
|
||||
return jsonify({
|
||||
"ok": True,
|
||||
"deleted": {
|
||||
"id": row["id"],
|
||||
"game_version": row["game_version"],
|
||||
"label": row["label"],
|
||||
"serial_number_hex": row["serial_number_hex"],
|
||||
},
|
||||
"sync_status": sync["status"],
|
||||
"account_id": account_id_str(account_id),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user