From 6a4789b2482af970c1ec4579ab23b788c1b76ae4 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 01:22:32 -0400 Subject: [PATCH 1/3] Add login lock session plumbing --- src/AccountSync.cc | 53 ++++++++++++++++++++++++++++++++++++++++++ src/AccountSync.hh | 20 ++++++++++++++++ src/Client.cc | 9 +++++++ src/Client.hh | 3 +++ src/ReceiveCommands.cc | 19 +++++++++++++++ 5 files changed, 104 insertions(+) diff --git a/src/AccountSync.cc b/src/AccountSync.cc index a6680947..335de790 100644 --- a/src/AccountSync.cc +++ b/src/AccountSync.cc @@ -170,6 +170,59 @@ void configure_from_json(const phosg::JSON& json) { configure(cfg); } +asio::awaitable acquire_login_lock( + uint32_t account_id, + const std::string& version_name, + const std::string& existing_session_nonce) { + auto cfg = get_config(); + + LoginLockAcquireResult ret; + if (!cfg.enabled || !cfg.enable_login_locks) { + co_return ret; + } + + if (!existing_session_nonce.empty()) { + ret.session_nonce = existing_session_nonce; + co_return ret; + } + + ret.session_nonce = std::format("{}-{}-{}", source_label(cfg), account_id, now_usecs()); + std::fprintf(stderr, + "[AccountSync] warning login_locks enabled but coordinator acquire is not implemented; allowing account_id=%010u source=%s version=%s nonce=%s\n", + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str(), + ret.session_nonce.c_str()); + + co_return ret; +} + +void notify_login_session_end( + uint32_t account_id, + const std::string& session_nonce, + const std::string& version_name) { + auto cfg = get_config(); + if (!cfg.enabled || !cfg.enable_login_locks) { + return; + } + + std::fprintf(stderr, + "[AccountSync] event=login_session_end source=%s source_region=%s source_ship=%s account_store=%s account_id=%010u session_nonce=%s version=%s\n", + source_label(cfg).c_str(), + cfg.source_region.c_str(), + cfg.source_ship.c_str(), + cfg.account_store.c_str(), + static_cast(account_id), + session_nonce.c_str(), + version_name.c_str()); + + auto line = base_event_json(cfg, "login_session_end", account_id) + + std::format(",\"session_nonce\":\"{}\",\"version\":\"{}\"}}", + json_escape(session_nonce), + json_escape(version_name)); + append_spool_line(cfg, line); +} + void notify_account_saved(uint32_t account_id, const std::string& filename) { auto cfg = get_config(); if (!cfg.enabled || !cfg.notify_account_saves) { diff --git a/src/AccountSync.hh b/src/AccountSync.hh index cbfeb78d..54c538cb 100644 --- a/src/AccountSync.hh +++ b/src/AccountSync.hh @@ -2,6 +2,8 @@ #include #include + +#include #include #include @@ -33,6 +35,24 @@ struct Config { void configure(const Config& cfg); void configure_from_json(const phosg::JSON& json); +struct LoginLockAcquireResult { + bool allowed = true; + bool fail_open_used = false; + std::string session_nonce; + std::string message; + std::string holder_source; +}; + +asio::awaitable acquire_login_lock( + uint32_t account_id, + const std::string& version_name, + const std::string& existing_session_nonce); + +void notify_login_session_end( + uint32_t account_id, + const std::string& session_nonce, + const std::string& version_name); + void notify_account_saved(uint32_t account_id, const std::string& filename); void notify_backup_saved(uint32_t account_id, size_t slot, const std::string& filename); diff --git a/src/Client.cc b/src/Client.cc index 177c7468..eb130905 100644 --- a/src/Client.cc +++ b/src/Client.cc @@ -278,6 +278,15 @@ Client::~Client() { this->bb_character_index); } } + + if (this->account_sync_lock_acquired && this->login && this->login->account) { + AccountSync::notify_login_session_end( + this->login->account->account_id, + this->account_sync_session_nonce, + phosg::name_for_enum(this->version())); + this->account_sync_lock_acquired = false; + } + this->log.info_f("Deleted"); } diff --git a/src/Client.hh b/src/Client.hh index 5bdf509d..40a04b2f 100644 --- a/src/Client.hh +++ b/src/Client.hh @@ -132,6 +132,9 @@ public: uint64_t xb_user_id = 0; uint32_t xb_unknown_a1b = 0; std::shared_ptr login; + bool account_sync_lock_acquired = false; + uint32_t account_sync_lock_account_id = 0; + std::string account_sync_session_nonce; std::shared_ptr proxy_session; // Patch server state (only used for PC_PATCH and BB_PATCH versions) diff --git a/src/ReceiveCommands.cc b/src/ReceiveCommands.cc index f32a0718..326b95af 100644 --- a/src/ReceiveCommands.cc +++ b/src/ReceiveCommands.cc @@ -558,6 +558,25 @@ asio::awaitable start_login_server_procedure(std::shared_ptr c) { static asio::awaitable on_login_complete(std::shared_ptr c) { auto s = c->require_server_state(); + if (c->login && c->login->account) { + auto lock_res = co_await AccountSync::acquire_login_lock( + c->login->account->account_id, + phosg::name_for_enum(c->version()), + c->account_sync_session_nonce); + + if (!lock_res.allowed) { + c->log.info_f("Login lock denied: {}", lock_res.message); + c->channel->disconnect(); + co_return; + } + + if (!lock_res.session_nonce.empty()) { + c->account_sync_lock_acquired = true; + c->account_sync_lock_account_id = c->login->account->account_id; + c->account_sync_session_nonce = lock_res.session_nonce; + } + } + c->convert_account_to_temporary_if_nte(); if (!is_v4(c->version())) { From 56084c736f43e26bd16fff009ebebd6c15b4986f Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 01:26:46 -0400 Subject: [PATCH 2/3] Add coordinator login lock acquire request --- src/AccountSync.cc | 344 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 336 insertions(+), 8 deletions(-) diff --git a/src/AccountSync.cc b/src/AccountSync.cc index 335de790..f617da0b 100644 --- a/src/AccountSync.cc +++ b/src/AccountSync.cc @@ -1,8 +1,12 @@ #include "AccountSync.hh" +#include "AsyncUtils.hh" + +#include #include #include #include +#include #include #include #include @@ -25,6 +29,239 @@ static Config get_config() { return current_config; } +struct ParsedHTTPURL { + std::string host; + uint16_t port = 80; + std::string path = "/"; +}; + +static ParsedHTTPURL parse_http_url(const std::string& url) { + static const std::string prefix = "http://"; + if (!url.starts_with(prefix)) { + throw std::runtime_error("only http:// coordinator URLs are supported"); + } + + size_t host_start = prefix.size(); + size_t path_start = url.find('/', host_start); + std::string host_port = (path_start == std::string::npos) + ? url.substr(host_start) + : url.substr(host_start, path_start - host_start); + + ParsedHTTPURL ret; + ret.path = (path_start == std::string::npos) ? "/" : url.substr(path_start); + if (host_port.empty()) { + throw std::runtime_error("coordinator URL has empty host"); + } + + size_t colon_offset = host_port.rfind(':'); + if (colon_offset == std::string::npos) { + ret.host = host_port; + } else { + ret.host = host_port.substr(0, colon_offset); + std::string port_s = host_port.substr(colon_offset + 1); + if (ret.host.empty() || port_s.empty()) { + throw std::runtime_error("coordinator URL has invalid host/port"); + } + size_t end_offset = 0; + uint64_t port = std::stoull(port_s, &end_offset, 10); + if ((end_offset != port_s.size()) || (port == 0) || (port > 0xFFFF)) { + throw std::runtime_error("coordinator URL has invalid port"); + } + ret.port = port; + } + + return ret; +} + +static std::string join_url_path(std::string base_path, const std::string& suffix) { + while ((base_path.size() > 1) && (base_path.back() == '/')) { + base_path.pop_back(); + } + if (base_path.empty() || (base_path == "/")) { + return suffix; + } + return base_path + suffix; +} + +static std::string lowercase(std::string s) { + std::transform(s.begin(), s.end(), s.begin(), [](unsigned char ch) -> char { + return static_cast(std::tolower(ch)); + }); + return s; +} + +static void trim_ascii_inplace(std::string& s) { + while (!s.empty() && ((s.back() == ' ') || (s.back() == '\t') || (s.back() == '\r') || (s.back() == '\n'))) { + s.pop_back(); + } + size_t start = 0; + while ((start < s.size()) && ((s[start] == ' ') || (s[start] == '\t') || (s[start] == '\r') || (s[start] == '\n'))) { + start++; + } + if (start) { + s = s.substr(start); + } +} + +static asio::awaitable read_http_line( + asio::ip::tcp::socket& sock, + std::string& pending_data, + size_t max_length) { + static const char* delimiter = "\r\n"; + static const size_t delimiter_size = 2; + + size_t delimiter_pos = pending_data.find(delimiter); + while ((delimiter_pos == std::string::npos) && (pending_data.size() < max_length)) { + size_t pre_size = pending_data.size(); + pending_data.resize(std::min(max_length, pending_data.size() + 0x400)); + size_t bytes_read = co_await sock.async_read_some( + asio::buffer(pending_data.data() + pre_size, pending_data.size() - pre_size), + asio::use_awaitable); + pending_data.resize(pre_size + bytes_read); + delimiter_pos = pending_data.find(delimiter, (pre_size >= 1) ? (pre_size - 1) : 0); + } + + if (delimiter_pos == std::string::npos) { + throw std::runtime_error("HTTP response line exceeds maximum length"); + } + + std::string ret = pending_data.substr(0, delimiter_pos); + pending_data = pending_data.substr(delimiter_pos + delimiter_size); + co_return ret; +} + +static asio::awaitable read_http_data( + asio::ip::tcp::socket& sock, + std::string& pending_data, + size_t size) { + std::string ret; + if (pending_data.size() == size) { + pending_data.swap(ret); + } else if (pending_data.size() > size) { + ret = pending_data.substr(0, size); + pending_data = pending_data.substr(size); + } else { + size_t bytes_to_read = size - pending_data.size(); + pending_data.swap(ret); + ret.resize(size); + co_await asio::async_read( + sock, + asio::buffer(ret.data() + size - bytes_to_read, bytes_to_read), + asio::use_awaitable); + } + co_return ret; +} + +static asio::awaitable post_json_with_timeout( + const Config& cfg, + const std::string& path_suffix, + const std::string& body) { + ParsedHTTPURL url = parse_http_url(cfg.coordinator_url); + std::string path = join_url_path(url.path, path_suffix); + + auto executor = co_await asio::this_coro::executor; + auto resolver = std::make_shared(executor); + auto sock = std::make_shared(executor); + auto timer = std::make_shared(executor); + auto timed_out = std::make_shared(false); + + timer->expires_after(std::chrono::microseconds(cfg.request_timeout_usecs)); + timer->async_wait([resolver, sock, timed_out](std::error_code ec) -> void { + if (!ec) { + *timed_out = true; + resolver->cancel(); + if (sock->is_open()) { + sock->close(); + } + } + }); + + try { + auto endpoints = co_await resolver->async_resolve(url.host, std::format("{}", url.port), asio::use_awaitable); + co_await asio::async_connect(*sock, endpoints, asio::use_awaitable); + + std::string host_header = url.host; + if (url.port != 80) { + host_header += std::format(":{}", url.port); + } + + std::string request = std::format( + "POST {} HTTP/1.1\r\n" + "Host: {}\r\n" + "User-Agent: psopeeps-newserv\r\n" + "Content-Type: application/json\r\n" + "Accept: application/json\r\n" + "Connection: close\r\n" + "X-Psopeeps-Admin-Secret: {}\r\n" + "Content-Length: {}\r\n" + "\r\n" + "{}", + path, + host_header, + cfg.shared_secret, + body.size(), + body); + + co_await asio::async_write(*sock, asio::buffer(request), asio::use_awaitable); + + std::string pending_data; + std::string status_line = co_await read_http_line(*sock, pending_data, 0x1000); + if (!status_line.starts_with("HTTP/1.")) { + throw std::runtime_error("invalid HTTP response from coordinator"); + } + + size_t first_space = status_line.find(' '); + if (first_space == std::string::npos) { + throw std::runtime_error("invalid HTTP status line from coordinator"); + } + size_t second_space = status_line.find(' ', first_space + 1); + std::string code_s = status_line.substr( + first_space + 1, + (second_space == std::string::npos) ? std::string::npos : (second_space - first_space - 1)); + int response_code = std::stoi(code_s); + + size_t content_length = 0; + for (;;) { + std::string line = co_await read_http_line(*sock, pending_data, 0x10000); + if (line.empty()) { + break; + } + size_t colon_offset = line.find(':'); + if (colon_offset == std::string::npos) { + continue; + } + std::string name = lowercase(line.substr(0, colon_offset)); + std::string value = line.substr(colon_offset + 1); + trim_ascii_inplace(value); + if (name == "content-length") { + size_t end_offset = 0; + content_length = std::stoull(value, &end_offset, 10); + if (end_offset != value.size()) { + throw std::runtime_error("invalid Content-Length from coordinator"); + } + } + } + + if (response_code != 200) { + throw std::runtime_error(std::format("coordinator returned HTTP {}", response_code)); + } + if (content_length > 0x100000) { + throw std::runtime_error("coordinator response is too large"); + } + + std::string response_body = co_await read_http_data(*sock, pending_data, content_length); + timer->cancel(); + co_return phosg::JSON::parse(response_body); + + } catch (...) { + timer->cancel(); + if (*timed_out) { + throw std::runtime_error("coordinator request timed out"); + } + throw; + } +} + static std::string source_label(const Config& cfg) { if (!cfg.source.empty()) { return cfg.source; @@ -186,15 +423,106 @@ asio::awaitable acquire_login_lock( co_return ret; } - ret.session_nonce = std::format("{}-{}-{}", source_label(cfg), account_id, now_usecs()); - std::fprintf(stderr, - "[AccountSync] warning login_locks enabled but coordinator acquire is not implemented; allowing account_id=%010u source=%s version=%s nonce=%s\n", - static_cast(account_id), - source_label(cfg).c_str(), - version_name.c_str(), - ret.session_nonce.c_str()); + std::string proposed_session_nonce = std::format("{}-{}-{}", source_label(cfg), account_id, now_usecs()); - co_return ret; + if (cfg.coordinator_url.empty()) { + std::string message = "account lock coordinator URL is not configured"; + if (cfg.fail_open) { + ret.allowed = true; + ret.fail_open_used = true; + ret.session_nonce = proposed_session_nonce; + ret.message = message; + std::fprintf(stderr, + "[AccountSync] warning login_lock_fail_open reason=%s account_id=%010u source=%s version=%s nonce=%s\n", + message.c_str(), + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str(), + ret.session_nonce.c_str()); + co_return ret; + } + + ret.allowed = false; + ret.message = "$C6Account lock server\nis unavailable."; + std::fprintf(stderr, + "[AccountSync] login_lock_denied reason=%s account_id=%010u source=%s version=%s\n", + message.c_str(), + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str()); + co_return ret; + } + + std::string body = std::format( + "{{\"account_id\":{},\"account_id_str\":\"{:010}\",\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"account_store\":\"{}\",\"version\":\"{}\",\"session_nonce\":\"{}\"}}", + static_cast(account_id), + static_cast(account_id), + json_escape(source_label(cfg)), + json_escape(cfg.source_region), + json_escape(cfg.source_ship), + json_escape(cfg.account_store), + json_escape(version_name), + json_escape(proposed_session_nonce)); + + try { + phosg::JSON response = co_await post_json_with_timeout(cfg, "/account-locks/acquire", body); + + ret.allowed = response.get_bool("ok", response.get_bool("OK", false)); + ret.session_nonce = response.get_string("session_nonce", proposed_session_nonce); + ret.message = response.get_string("message", ""); + ret.holder_source = response.get_string("holder_source", ""); + + if (ret.allowed) { + if (ret.session_nonce.empty()) { + ret.session_nonce = proposed_session_nonce; + } + std::fprintf(stderr, + "[AccountSync] login_lock_acquired account_id=%010u source=%s version=%s nonce=%s\n", + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str(), + ret.session_nonce.c_str()); + } else { + if (ret.message.empty()) { + ret.message = "$C6Account is already active\non another ship."; + } + std::fprintf(stderr, + "[AccountSync] login_lock_denied account_id=%010u source=%s version=%s holder_source=%s message=%s\n", + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str(), + ret.holder_source.c_str(), + ret.message.c_str()); + } + + co_return ret; + + } catch (const std::exception& e) { + if (cfg.fail_open) { + ret.allowed = true; + ret.fail_open_used = true; + ret.session_nonce = proposed_session_nonce; + ret.message = e.what(); + std::fprintf(stderr, + "[AccountSync] warning login_lock_fail_open reason=%s account_id=%010u source=%s version=%s nonce=%s\n", + e.what(), + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str(), + ret.session_nonce.c_str()); + co_return ret; + } + + ret.allowed = false; + ret.message = "$C6Account lock server\nis unavailable."; + std::fprintf(stderr, + "[AccountSync] login_lock_denied reason=%s account_id=%010u source=%s version=%s\n", + e.what(), + static_cast(account_id), + source_label(cfg).c_str(), + version_name.c_str()); + co_return ret; + } } void notify_login_session_end( From 3d37aacc06265492aac1866f0e7eaed69f8d24cd Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 02:06:07 -0400 Subject: [PATCH 3/3] Add login lock coordinator heartbeat --- src/AccountSync.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++ src/AccountSync.hh | 3 ++ src/ServerState.cc | 1 + 3 files changed, 77 insertions(+) diff --git a/src/AccountSync.cc b/src/AccountSync.cc index f617da0b..95fbf250 100644 --- a/src/AccountSync.cc +++ b/src/AccountSync.cc @@ -3,8 +3,10 @@ #include "AsyncUtils.hh" #include +#include #include #include +#include #include #include #include @@ -18,6 +20,7 @@ namespace AccountSync { static std::mutex config_mutex; static std::mutex spool_mutex; static Config current_config; +static std::atomic heartbeat_task_started(false); static uint64_t now_usecs() { using namespace std::chrono; @@ -402,11 +405,81 @@ void configure_from_json(const phosg::JSON& json) { cfg.notify_player_saves = json.get_bool("NotifyPlayerSaves", true); cfg.notify_backup_saves = json.get_bool("NotifyBackupSaves", true); cfg.enable_login_locks = json.get_bool("EnableLoginLocks", false); + cfg.login_lock_heartbeat_interval_usecs = json.get_int("LoginLockHeartbeatIntervalUsecs", 60000000); cfg.notify_bb_sessions = json.get_bool("NotifyBBSessions", cfg.enable_login_locks); cfg.spool_directory = json.get_string("SpoolDirectory", "system/account-sync-spool"); configure(cfg); } +static asio::awaitable send_login_lock_heartbeat() { + auto cfg = get_config(); + + if (!cfg.enabled || !cfg.enable_login_locks) { + co_return; + } + if (cfg.coordinator_url.empty()) { + std::fprintf(stderr, + "[AccountSync] warning login_lock_heartbeat_skipped reason=coordinator_url_not_configured source=%s\n", + source_label(cfg).c_str()); + co_return; + } + + std::string body = std::format( + "{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"account_store\":\"{}\"}}", + json_escape(source_label(cfg)), + json_escape(cfg.source_region), + json_escape(cfg.source_ship), + json_escape(cfg.account_store)); + + try { + phosg::JSON response = co_await post_json_with_timeout(cfg, "/account-locks/heartbeat", body); + bool ok = response.get_bool("ok", response.get_bool("OK", false)); + if (!ok) { + std::fprintf(stderr, + "[AccountSync] warning login_lock_heartbeat_rejected source=%s response=%s\n", + source_label(cfg).c_str(), + response.serialize().c_str()); + co_return; + } + + int64_t refreshed = response.get_int("refreshed", 0); + std::fprintf(stderr, + "[AccountSync] login_lock_heartbeat_ok source=%s refreshed=%" PRId64 "\n", + source_label(cfg).c_str(), + refreshed); + + } catch (const std::exception& e) { + std::fprintf(stderr, + "[AccountSync] warning login_lock_heartbeat_failed source=%s error=%s\n", + source_label(cfg).c_str(), + e.what()); + } +} + +static asio::awaitable login_lock_heartbeat_task() { + for (;;) { + auto cfg = get_config(); + uint64_t interval_usecs = cfg.login_lock_heartbeat_interval_usecs; + if (interval_usecs < 5000000) { + interval_usecs = 5000000; + } + + co_await async_sleep(std::chrono::microseconds(interval_usecs)); + co_await send_login_lock_heartbeat(); + } +} + +void start_login_lock_heartbeat_task(asio::io_context& io_context) { + bool expected = false; + if (!heartbeat_task_started.compare_exchange_strong(expected, true)) { + return; + } + + asio::co_spawn(io_context, login_lock_heartbeat_task(), asio::detached); + std::fprintf(stderr, "[AccountSync] login lock heartbeat task started\n"); +} + + asio::awaitable acquire_login_lock( uint32_t account_id, const std::string& version_name, diff --git a/src/AccountSync.hh b/src/AccountSync.hh index 54c538cb..3e85da3b 100644 --- a/src/AccountSync.hh +++ b/src/AccountSync.hh @@ -29,6 +29,7 @@ struct Config { bool notify_backup_saves = true; bool notify_bb_sessions = false; bool enable_login_locks = false; // Reserved for future blocking lock behavior + uint64_t login_lock_heartbeat_interval_usecs = 60000000; std::string spool_directory = "system/account-sync-spool"; }; @@ -43,6 +44,8 @@ struct LoginLockAcquireResult { std::string holder_source; }; +void start_login_lock_heartbeat_task(asio::io_context& io_context); + asio::awaitable acquire_login_lock( uint32_t account_id, const std::string& version_name, diff --git a/src/ServerState.cc b/src/ServerState.cc index 78cf9bf5..ee612c80 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -881,6 +881,7 @@ void ServerState::load_config_early() { this->client_idle_timeout_usecs = this->config_json->get_int("ClientIdleTimeout", 60000000); this->patch_client_idle_timeout_usecs = this->config_json->get_int("PatchClientIdleTimeout", 300000000); AccountSync::configure_from_json(this->config_json->get("AccountSync", phosg::JSON::dict())); + AccountSync::start_login_lock_heartbeat_task(*this->io_context); this->psopeeps_dcv2_exp_multiplier = this->config_json->get_int("PsoPeepsDCV2EXPMultiplier", 5); if ((this->psopeeps_dcv2_exp_multiplier != 5) && (this->psopeeps_dcv2_exp_multiplier != 10)) { throw std::runtime_error("PsoPeepsDCV2EXPMultiplier must be 5 or 10");