From c4fb18b3b42b512da05545bed0708ec84e534b08 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 21:53:48 -0400 Subject: [PATCH 01/11] Add TeamSync configuration scaffold --- CMakeLists.txt | 1 + src/ServerState.cc | 2 + src/TeamSync.cc | 93 ++++++++++++++++++++++++++++++++++++++++++++++ src/TeamSync.hh | 32 ++++++++++++++++ 4 files changed, 128 insertions(+) create mode 100644 src/TeamSync.cc create mode 100644 src/TeamSync.hh diff --git a/CMakeLists.txt b/CMakeLists.txt index 64c3ac59..4b4c71f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,6 +51,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/Revision.cc src/Account.cc src/AccountSync.cc + src/TeamSync.cc src/AddressTranslator.cc src/AFSArchive.cc src/AsyncHTTPServer.cc diff --git a/src/ServerState.cc b/src/ServerState.cc index f21d2b73..4d78ca57 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -19,6 +19,7 @@ #include "Text.hh" #include "TextIndex.hh" #include "AccountSync.hh" +#include "TeamSync.hh" #ifdef PHOSG_WINDOWS static constexpr bool IS_WINDOWS = true; @@ -886,6 +887,7 @@ void ServerState::load_config_early() { this->patch_client_idle_timeout_usecs = this->config_json->get_int("PatchClientIdleTimeout", 300000000); AccountSync::configure_from_json(this->config_json->get("AccountSync", phosg::JSON::dict())); AccountSync::start_login_lock_heartbeat_task(*this->io_context); + TeamSync::configure_from_json(this->config_json->get("TeamSync", phosg::JSON::dict())); this->psopeeps_dcv2_exp_multiplier = this->config_json->get_int("PsoPeepsDCV2EXPMultiplier", 5); if ((this->psopeeps_dcv2_exp_multiplier != 5) && (this->psopeeps_dcv2_exp_multiplier != 10)) { throw std::runtime_error("PsoPeepsDCV2EXPMultiplier must be 5 or 10"); diff --git a/src/TeamSync.cc b/src/TeamSync.cc new file mode 100644 index 00000000..2250592c --- /dev/null +++ b/src/TeamSync.cc @@ -0,0 +1,93 @@ +#include "TeamSync.hh" + +#include +#include +#include + +namespace TeamSync { + +static std::mutex config_mutex; +static Config current_config; + +static std::string source_label(const Config& cfg) { + if (!cfg.source.empty()) { + return cfg.source; + } + if (!cfg.source_region.empty() && !cfg.source_ship.empty()) { + return cfg.source_region + "-" + cfg.source_ship; + } + if (!cfg.source_ship.empty()) { + return cfg.source_ship; + } + return cfg.source_region; +} + +static Config get_config() { + std::lock_guard g(config_mutex); + return current_config; +} + +void configure(const Config& cfg) { + { + std::lock_guard g(config_mutex); + current_config = cfg; + } + + if (cfg.enabled) { + std::fprintf(stderr, + "[TeamSync] config enabled source=%s source_region=%s source_ship=%s coordinator_url=%s relay_team_chat=%s relay_team_points=%s relay_team_actions=%s\n", + source_label(cfg).c_str(), + cfg.source_region.c_str(), + cfg.source_ship.c_str(), + cfg.coordinator_url.c_str(), + cfg.relay_team_chat ? "true" : "false", + cfg.relay_team_points ? "true" : "false", + cfg.relay_team_actions ? "true" : "false"); + } else { + std::fprintf(stderr, "[TeamSync] config disabled\n"); + } +} + +void configure_from_json(const phosg::JSON& json) { + Config cfg; + cfg.enabled = json.get_bool("Enabled", false); + + const std::string legacy_region = json.get_string("Region", ""); + cfg.source = json.get_string("Source", legacy_region); + cfg.source_region = json.get_string("SourceRegion", ""); + cfg.source_ship = json.get_string("SourceShip", ""); + + if (cfg.source_region.empty() && cfg.source_ship.empty() && !legacy_region.empty()) { + size_t dash_offset = legacy_region.find('-'); + if (dash_offset != std::string::npos) { + cfg.source_region = legacy_region.substr(0, dash_offset); + cfg.source_ship = legacy_region.substr(dash_offset + 1); + } else { + cfg.source_region = legacy_region; + } + } + + cfg.coordinator_url = json.get_string("CoordinatorURL", ""); + cfg.shared_secret = json.get_string("SharedSecret", ""); + cfg.request_timeout_usecs = json.get_int("RequestTimeoutUsecs", 3000000); + if (cfg.request_timeout_usecs < 100000) { + cfg.request_timeout_usecs = 100000; + } + + cfg.relay_team_chat = json.get_bool("RelayTeamChat", false); + cfg.relay_team_points = json.get_bool("RelayTeamPoints", false); + cfg.relay_team_actions = json.get_bool("RelayTeamActions", false); + + configure(cfg); +} + +bool enabled() { + return get_config().enabled; +} + +bool relay_team_chat_enabled() { + auto cfg = get_config(); + return cfg.enabled && cfg.relay_team_chat; +} + +} // namespace TeamSync diff --git a/src/TeamSync.hh b/src/TeamSync.hh new file mode 100644 index 00000000..25eaf41b --- /dev/null +++ b/src/TeamSync.hh @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +#include + +namespace TeamSync { + +struct Config { + bool enabled = false; + + std::string source; + std::string source_region; + std::string source_ship; + + std::string coordinator_url; + std::string shared_secret; + uint64_t request_timeout_usecs = 3000000; + + bool relay_team_chat = false; + bool relay_team_points = false; + bool relay_team_actions = false; +}; + +void configure(const Config& cfg); +void configure_from_json(const phosg::JSON& json); + +bool enabled(); +bool relay_team_chat_enabled(); + +} // namespace TeamSync From bf3c9c08e652cd79e5d9f761d6d0feafaa0ada2e Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 22:00:23 -0400 Subject: [PATCH 02/11] Harden TeamSync config parsing --- src/TeamSync.cc | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/TeamSync.cc b/src/TeamSync.cc index 2250592c..522c9967 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -1,8 +1,10 @@ #include "TeamSync.hh" #include +#include #include #include +#include namespace TeamSync { @@ -57,27 +59,36 @@ void configure_from_json(const phosg::JSON& json) { cfg.source_region = json.get_string("SourceRegion", ""); cfg.source_ship = json.get_string("SourceShip", ""); - if (cfg.source_region.empty() && cfg.source_ship.empty() && !legacy_region.empty()) { - size_t dash_offset = legacy_region.find('-'); + const std::string identity_source = !cfg.source.empty() ? cfg.source : legacy_region; + if (cfg.source_region.empty() && cfg.source_ship.empty() && !identity_source.empty()) { + size_t dash_offset = identity_source.find('-'); if (dash_offset != std::string::npos) { - cfg.source_region = legacy_region.substr(0, dash_offset); - cfg.source_ship = legacy_region.substr(dash_offset + 1); + cfg.source_region = identity_source.substr(0, dash_offset); + cfg.source_ship = identity_source.substr(dash_offset + 1); } else { - cfg.source_region = legacy_region; + cfg.source_region = identity_source; } } cfg.coordinator_url = json.get_string("CoordinatorURL", ""); cfg.shared_secret = json.get_string("SharedSecret", ""); - cfg.request_timeout_usecs = json.get_int("RequestTimeoutUsecs", 3000000); - if (cfg.request_timeout_usecs < 100000) { - cfg.request_timeout_usecs = 100000; + + int64_t request_timeout_usecs = json.get_int("RequestTimeoutUsecs", 3000000); + if (request_timeout_usecs < 100000) { + request_timeout_usecs = 100000; + } else if (request_timeout_usecs > 30000000) { + request_timeout_usecs = 30000000; } + cfg.request_timeout_usecs = static_cast(request_timeout_usecs); cfg.relay_team_chat = json.get_bool("RelayTeamChat", false); cfg.relay_team_points = json.get_bool("RelayTeamPoints", false); cfg.relay_team_actions = json.get_bool("RelayTeamActions", false); + if (cfg.enabled && (cfg.source_region.empty() || cfg.source_ship.empty())) { + throw std::runtime_error("TeamSync requires SourceRegion and SourceShip, or Source/Region in region-ship form"); + } + configure(cfg); } From e9187609ae37a45c3c74ba3b498c53ac97a3078d Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 22:28:35 -0400 Subject: [PATCH 03/11] Add TeamSync exchange task scaffold --- src/ServerState.cc | 1 + src/TeamSync.cc | 460 ++++++++++++++++++++++++++++++++++++++++++++- src/TeamSync.hh | 5 + 3 files changed, 465 insertions(+), 1 deletion(-) diff --git a/src/ServerState.cc b/src/ServerState.cc index 4d78ca57..48924f56 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -888,6 +888,7 @@ void ServerState::load_config_early() { AccountSync::configure_from_json(this->config_json->get("AccountSync", phosg::JSON::dict())); AccountSync::start_login_lock_heartbeat_task(*this->io_context); TeamSync::configure_from_json(this->config_json->get("TeamSync", phosg::JSON::dict())); + TeamSync::start_exchange_task(*this->io_context); this->psopeeps_dcv2_exp_multiplier = this->config_json->get_int("PsoPeepsDCV2EXPMultiplier", 5); if ((this->psopeeps_dcv2_exp_multiplier != 5) && (this->psopeeps_dcv2_exp_multiplier != 10)) { throw std::runtime_error("PsoPeepsDCV2EXPMultiplier must be 5 or 10"); diff --git a/src/TeamSync.cc b/src/TeamSync.cc index 522c9967..ca307a1e 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -1,15 +1,36 @@ #include "TeamSync.hh" +#include "AsyncUtils.hh" + +#include +#include +#include +#include #include #include #include +#include +#include #include +#include #include +#include namespace TeamSync { static std::mutex config_mutex; static Config current_config; +static std::atomic exchange_task_started(false); + +struct ExchangeState { + bool bootstrapped = false; + uint64_t cursor = 0; + uint64_t next_seq = 1; + uint64_t consecutive_failures = 0; +}; + +static std::mutex exchange_state_mutex; +static ExchangeState exchange_state; static std::string source_label(const Config& cfg) { if (!cfg.source.empty()) { @@ -29,19 +50,311 @@ static Config get_config() { return current_config; } +static ExchangeState get_exchange_state() { + std::lock_guard g(exchange_state_mutex); + return exchange_state; +} + +static void reset_exchange_state() { + std::lock_guard g(exchange_state_mutex); + exchange_state = ExchangeState(); +} + +static bool exchange_enabled(const Config& cfg) { + return cfg.enabled && (cfg.relay_team_chat || cfg.relay_team_points || cfg.relay_team_actions); +} + +static uint64_t exchange_sleep_usecs(const Config& cfg, uint64_t consecutive_failures) { + uint64_t base = cfg.exchange_interval_usecs; + if (consecutive_failures) { + uint64_t multiplier = 1ULL << std::min(consecutive_failures, 5); + base = std::min(30000000, base * multiplier); + } + + static std::mutex jitter_mutex; + static std::mt19937_64 rng(std::random_device{}()); + std::lock_guard g(jitter_mutex); + std::uniform_int_distribution dist(0, std::max(1, base / 5)); + return base + dist(rng); +} + +struct ParsedHTTPURL { + std::string host; + uint16_t port = 80; + std::string path = "/"; +}; + +static ParsedHTTPURL parse_http_url(const std::string& url) { + static const std::string prefix = "http://"; + if (!url.starts_with(prefix)) { + throw std::runtime_error("only http:// coordinator URLs are supported"); + } + + size_t host_start = prefix.size(); + size_t path_start = url.find('/', host_start); + std::string host_port = (path_start == std::string::npos) + ? url.substr(host_start) + : url.substr(host_start, path_start - host_start); + + ParsedHTTPURL ret; + ret.path = (path_start == std::string::npos) ? "/" : url.substr(path_start); + if (host_port.empty()) { + throw std::runtime_error("coordinator URL has empty host"); + } + + size_t colon_offset = host_port.rfind(':'); + if (colon_offset == std::string::npos) { + ret.host = host_port; + } else { + ret.host = host_port.substr(0, colon_offset); + std::string port_s = host_port.substr(colon_offset + 1); + if (ret.host.empty() || port_s.empty()) { + throw std::runtime_error("coordinator URL has invalid host/port"); + } + size_t end_offset = 0; + uint64_t port = std::stoull(port_s, &end_offset, 10); + if ((end_offset != port_s.size()) || (port == 0) || (port > 0xFFFF)) { + throw std::runtime_error("coordinator URL has invalid port"); + } + ret.port = port; + } + + return ret; +} + +static std::string join_url_path(std::string base_path, const std::string& suffix) { + while ((base_path.size() > 1) && (base_path.back() == '/')) { + base_path.pop_back(); + } + if (base_path.empty() || (base_path == "/")) { + return suffix; + } + return base_path + suffix; +} + +static std::string lowercase(std::string s) { + std::transform(s.begin(), s.end(), s.begin(), [](unsigned char ch) -> char { + return static_cast(std::tolower(ch)); + }); + return s; +} + +static void trim_ascii_inplace(std::string& s) { + while (!s.empty() && ((s.back() == ' ') || (s.back() == '\t') || (s.back() == '\r') || (s.back() == '\n'))) { + s.pop_back(); + } + size_t start = 0; + while ((start < s.size()) && ((s[start] == ' ') || (s[start] == '\t') || (s[start] == '\r') || (s[start] == '\n'))) { + start++; + } + if (start) { + s = s.substr(start); + } +} + +static std::string json_escape(const std::string& s) { + std::string ret; + ret.reserve(s.size() + 2); + for (char ch : s) { + if (ch == '\\') { + ret += "\\\\"; + } else if (ch == '"') { + ret += "\\\""; + } else if (ch == '\n') { + ret += "\\n"; + } else if (ch == '\r') { + ret += "\\r"; + } else if (ch == '\t') { + ret += "\\t"; + } else { + ret += ch; + } + } + return ret; +} + +static asio::awaitable read_http_line( + asio::ip::tcp::socket& sock, + std::string& pending_data, + size_t max_length) { + static const char* delimiter = "\r\n"; + static const size_t delimiter_size = 2; + + size_t delimiter_pos = pending_data.find(delimiter); + while ((delimiter_pos == std::string::npos) && (pending_data.size() < max_length)) { + size_t pre_size = pending_data.size(); + pending_data.resize(std::min(max_length, pending_data.size() + 0x400)); + size_t bytes_read = co_await sock.async_read_some( + asio::buffer(pending_data.data() + pre_size, pending_data.size() - pre_size), + asio::use_awaitable); + pending_data.resize(pre_size + bytes_read); + delimiter_pos = pending_data.find(delimiter, (pre_size >= 1) ? (pre_size - 1) : 0); + } + + if (delimiter_pos == std::string::npos) { + throw std::runtime_error("HTTP response line exceeds maximum length"); + } + + std::string ret = pending_data.substr(0, delimiter_pos); + pending_data = pending_data.substr(delimiter_pos + delimiter_size); + co_return ret; +} + +static asio::awaitable read_http_data( + asio::ip::tcp::socket& sock, + std::string& pending_data, + size_t size) { + std::string ret; + if (pending_data.size() == size) { + pending_data.swap(ret); + } else if (pending_data.size() > size) { + ret = pending_data.substr(0, size); + pending_data = pending_data.substr(size); + } else { + size_t bytes_to_read = size - pending_data.size(); + pending_data.swap(ret); + ret.resize(size); + co_await asio::async_read( + sock, + asio::buffer(ret.data() + size - bytes_to_read, bytes_to_read), + asio::use_awaitable); + } + co_return ret; +} + +static asio::awaitable post_json_with_timeout( + const Config& cfg, + const std::string& path_suffix, + const std::string& body) { + ParsedHTTPURL url = parse_http_url(cfg.coordinator_url); + std::string path = join_url_path(url.path, path_suffix); + + auto executor = co_await asio::this_coro::executor; + auto resolver = std::make_shared(executor); + auto sock = std::make_shared(executor); + auto timer = std::make_shared(executor); + auto timed_out = std::make_shared(false); + + timer->expires_after(std::chrono::microseconds(cfg.request_timeout_usecs)); + timer->async_wait([resolver, sock, timed_out](std::error_code ec) -> void { + if (!ec) { + *timed_out = true; + resolver->cancel(); + if (sock->is_open()) { + sock->close(); + } + } + }); + + try { + auto endpoints = co_await resolver->async_resolve(url.host, std::format("{}", url.port), asio::use_awaitable); + co_await asio::async_connect(*sock, endpoints, asio::use_awaitable); + + std::string host_header = url.host; + if (url.port != 80) { + host_header += std::format(":{}", url.port); + } + + std::string request = std::format( + "POST {} HTTP/1.1\r\n" + "Host: {}\r\n" + "User-Agent: psopeeps-newserv\r\n" + "Content-Type: application/json\r\n" + "Accept: application/json\r\n" + "Connection: close\r\n" + "X-Psopeeps-Team-Sync-Secret: {}\r\n" + "Content-Length: {}\r\n" + "\r\n" + "{}", + path, + host_header, + cfg.shared_secret, + body.size(), + body); + + co_await asio::async_write(*sock, asio::buffer(request), asio::use_awaitable); + + std::string pending_data; + std::string status_line = co_await read_http_line(*sock, pending_data, 0x1000); + if (!status_line.starts_with("HTTP/1.")) { + throw std::runtime_error("invalid HTTP response from coordinator"); + } + + size_t first_space = status_line.find(' '); + if (first_space == std::string::npos) { + throw std::runtime_error("invalid HTTP status line from coordinator"); + } + size_t second_space = status_line.find(' ', first_space + 1); + std::string code_s = status_line.substr( + first_space + 1, + (second_space == std::string::npos) ? std::string::npos : (second_space - first_space - 1)); + int response_code = std::stoi(code_s); + + size_t content_length = 0; + for (;;) { + std::string line = co_await read_http_line(*sock, pending_data, 0x10000); + if (line.empty()) { + break; + } + size_t colon_offset = line.find(':'); + if (colon_offset == std::string::npos) { + continue; + } + std::string name = lowercase(line.substr(0, colon_offset)); + std::string value = line.substr(colon_offset + 1); + trim_ascii_inplace(value); + if (name == "content-length") { + size_t end_offset = 0; + content_length = std::stoull(value, &end_offset, 10); + if (end_offset != value.size()) { + throw std::runtime_error("invalid Content-Length from coordinator"); + } + } + } + + if (response_code != 200) { + throw std::runtime_error(std::format("coordinator returned HTTP {}", response_code)); + } + if (content_length > 0x100000) { + throw std::runtime_error("coordinator response is too large"); + } + + std::string response_body = co_await read_http_data(*sock, pending_data, content_length); + timer->cancel(); + co_return phosg::JSON::parse(response_body); + + } catch (...) { + timer->cancel(); + if (*timed_out) { + throw std::runtime_error("coordinator request timed out"); + } + throw; + } +} + void configure(const Config& cfg) { + Config old_cfg; { std::lock_guard g(config_mutex); + old_cfg = current_config; current_config = cfg; } + if ((old_cfg.source_region != cfg.source_region) || + (old_cfg.source_ship != cfg.source_ship) || + (old_cfg.team_namespace != cfg.team_namespace)) { + reset_exchange_state(); + } + if (cfg.enabled) { std::fprintf(stderr, - "[TeamSync] config enabled source=%s source_region=%s source_ship=%s coordinator_url=%s relay_team_chat=%s relay_team_points=%s relay_team_actions=%s\n", + "[TeamSync] config enabled source=%s source_region=%s source_ship=%s team_namespace=%s coordinator_url=%s exchange_interval_usecs=%llu relay_team_chat=%s relay_team_points=%s relay_team_actions=%s\n", source_label(cfg).c_str(), cfg.source_region.c_str(), cfg.source_ship.c_str(), + cfg.team_namespace.c_str(), cfg.coordinator_url.c_str(), + static_cast(cfg.exchange_interval_usecs), cfg.relay_team_chat ? "true" : "false", cfg.relay_team_points ? "true" : "false", cfg.relay_team_actions ? "true" : "false"); @@ -70,6 +383,7 @@ void configure_from_json(const phosg::JSON& json) { } } + cfg.team_namespace = json.get_string("TeamNamespace", "bb"); cfg.coordinator_url = json.get_string("CoordinatorURL", ""); cfg.shared_secret = json.get_string("SharedSecret", ""); @@ -81,6 +395,14 @@ void configure_from_json(const phosg::JSON& json) { } cfg.request_timeout_usecs = static_cast(request_timeout_usecs); + int64_t exchange_interval_usecs = json.get_int("ExchangeIntervalUsecs", 1500000); + if (exchange_interval_usecs < 250000) { + exchange_interval_usecs = 250000; + } else if (exchange_interval_usecs > 30000000) { + exchange_interval_usecs = 30000000; + } + cfg.exchange_interval_usecs = static_cast(exchange_interval_usecs); + cfg.relay_team_chat = json.get_bool("RelayTeamChat", false); cfg.relay_team_points = json.get_bool("RelayTeamPoints", false); cfg.relay_team_actions = json.get_bool("RelayTeamActions", false); @@ -88,6 +410,9 @@ void configure_from_json(const phosg::JSON& json) { if (cfg.enabled && (cfg.source_region.empty() || cfg.source_ship.empty())) { throw std::runtime_error("TeamSync requires SourceRegion and SourceShip, or Source/Region in region-ship form"); } + if (cfg.enabled && cfg.team_namespace.empty()) { + throw std::runtime_error("TeamSync requires non-empty TeamNamespace"); + } configure(cfg); } @@ -101,4 +426,137 @@ bool relay_team_chat_enabled() { return cfg.enabled && cfg.relay_team_chat; } +// This helper only builds empty event batches from trusted config fields. +// Do not extend the format-string JSON path for player-controlled data; use a +// real JSON object/serializer when team chat events are added. +static std::string exchange_body_for_empty_events(const Config& cfg, const ExchangeState& state) { + if (state.bootstrapped) { + return std::format( + "{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":{},\"events\":[]}}", + json_escape(source_label(cfg)), + json_escape(cfg.source_region), + json_escape(cfg.source_ship), + json_escape(cfg.team_namespace), + state.cursor); + } + + return std::format( + "{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":\"\",\"events\":[]}}", + json_escape(source_label(cfg)), + json_escape(cfg.source_region), + json_escape(cfg.source_ship), + json_escape(cfg.team_namespace)); +} + +static asio::awaitable run_empty_exchange_once(const Config& cfg) { + auto before_state = get_exchange_state(); + std::string body = exchange_body_for_empty_events(cfg, before_state); + phosg::JSON response = co_await post_json_with_timeout(cfg, "/team-sync/exchange", body); + + if (!response.get_bool("ok", false)) { + throw std::runtime_error("coordinator returned ok=false"); + } + + uint64_t ack_max_seq = response.get_int("ack_max_seq", 0); + uint64_t next_cursor = response.get_int("next_cursor", 0); + bool truncated = response.get_bool("truncated", false); + size_t inbound_events = response.get("events", phosg::JSON::list()).as_list().size(); + + { + std::lock_guard g(exchange_state_mutex); + + bool cursor_regressed = exchange_state.bootstrapped && (next_cursor < exchange_state.cursor); + bool ack_regressed = exchange_state.bootstrapped && (exchange_state.next_seq > 1) && (ack_max_seq < (exchange_state.next_seq - 1)); + if (cursor_regressed || ack_regressed) { + std::fprintf(stderr, + "[TeamSync] warning exchange_state_reset_detected source=%s team_namespace=%s old_cursor=%llu new_cursor=%llu old_next_seq=%llu ack_max_seq=%llu\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str(), + static_cast(exchange_state.cursor), + static_cast(next_cursor), + static_cast(exchange_state.next_seq), + static_cast(ack_max_seq)); + exchange_state = ExchangeState(); + co_return; + } + + if (!exchange_state.bootstrapped) { + exchange_state.next_seq = ack_max_seq + 1; + exchange_state.bootstrapped = true; + std::fprintf(stderr, + "[TeamSync] exchange bootstrap source=%s team_namespace=%s ack_max_seq=%llu next_seq=%llu cursor=%llu\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str(), + static_cast(ack_max_seq), + static_cast(exchange_state.next_seq), + static_cast(next_cursor)); + } + exchange_state.cursor = next_cursor; + exchange_state.consecutive_failures = 0; + } + + if (inbound_events || truncated) { + std::fprintf(stderr, + "[TeamSync] exchange received source=%s team_namespace=%s inbound_events=%zu truncated=%s cursor=%llu\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str(), + inbound_events, + truncated ? "true" : "false", + static_cast(next_cursor)); + } + + co_return; +} + +static asio::awaitable exchange_task() { + for (;;) { + auto cfg = get_config(); + + if (!exchange_enabled(cfg)) { + co_await async_sleep(std::chrono::seconds(5)); + continue; + } + + if (cfg.coordinator_url.empty()) { + std::fprintf(stderr, + "[TeamSync] warning exchange_skipped reason=coordinator_url_not_configured source=%s team_namespace=%s\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str()); + co_await async_sleep(std::chrono::microseconds(exchange_sleep_usecs(cfg, 1))); + continue; + } + + try { + co_await run_empty_exchange_once(cfg); + + } catch (const std::exception& e) { + uint64_t failures = 0; + { + std::lock_guard g(exchange_state_mutex); + exchange_state.consecutive_failures++; + failures = exchange_state.consecutive_failures; + } + + std::fprintf(stderr, + "[TeamSync] warning exchange_failed source=%s team_namespace=%s failures=%llu error=%s\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str(), + static_cast(failures), + e.what()); + } + + co_await async_sleep(std::chrono::microseconds(exchange_sleep_usecs(cfg, get_exchange_state().consecutive_failures))); + } +} + +void start_exchange_task(asio::io_context& io_context) { + bool expected = false; + if (!exchange_task_started.compare_exchange_strong(expected, true)) { + return; + } + + asio::co_spawn(io_context, exchange_task(), asio::detached); + std::fprintf(stderr, "[TeamSync] exchange task started\n"); +} + } // namespace TeamSync diff --git a/src/TeamSync.hh b/src/TeamSync.hh index 25eaf41b..6dbfca2d 100644 --- a/src/TeamSync.hh +++ b/src/TeamSync.hh @@ -3,6 +3,7 @@ #include #include +#include #include namespace TeamSync { @@ -13,10 +14,12 @@ struct Config { std::string source; std::string source_region; std::string source_ship; + std::string team_namespace = "bb"; std::string coordinator_url; std::string shared_secret; uint64_t request_timeout_usecs = 3000000; + uint64_t exchange_interval_usecs = 1500000; bool relay_team_chat = false; bool relay_team_points = false; @@ -29,4 +32,6 @@ void configure_from_json(const phosg::JSON& json); bool enabled(); bool relay_team_chat_enabled(); +void start_exchange_task(asio::io_context& io_context); + } // namespace TeamSync From 6995e5b7f4de5f93011619204ce888c16fb9fc0c Mon Sep 17 00:00:00 2001 From: James Osborne Date: Thu, 11 Jun 2026 22:34:47 -0400 Subject: [PATCH 04/11] Improve TeamSync coordinator error logging --- src/TeamSync.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/TeamSync.cc b/src/TeamSync.cc index ca307a1e..c97409db 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -312,14 +312,19 @@ static asio::awaitable post_json_with_timeout( } } - if (response_code != 200) { - throw std::runtime_error(std::format("coordinator returned HTTP {}", response_code)); - } if (content_length > 0x100000) { throw std::runtime_error("coordinator response is too large"); } std::string response_body = co_await read_http_data(*sock, pending_data, content_length); + + if (response_code != 200) { + throw std::runtime_error(std::format( + "coordinator returned HTTP {} body={}", + response_code, + response_body)); + } + timer->cancel(); co_return phosg::JSON::parse(response_body); From 4d893607c2c411b3d7604001a0c8cf85ef59570e Mon Sep 17 00:00:00 2001 From: James Osborne Date: Fri, 12 Jun 2026 00:57:17 -0400 Subject: [PATCH 05/11] Add TeamSync canonical team state apply scaffold --- src/ServerState.cc | 12 ++++++ src/TeamIndex.cc | 96 ++++++++++++++++++++++++++++++++++++++++++++++ src/TeamIndex.hh | 4 ++ src/TeamSync.cc | 25 ++++++++++++ src/TeamSync.hh | 4 ++ 5 files changed, 141 insertions(+) diff --git a/src/ServerState.cc b/src/ServerState.cc index 48924f56..b8dedf6d 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -1648,6 +1648,18 @@ void ServerState::load_accounts() { void ServerState::load_teams() { config_log.info_f("Indexing teams"); this->team_index = std::make_shared("system/teams", this->team_reward_defs_json); + + TeamSync::set_canonical_team_state_callback([this](const phosg::JSON& canonical_team_state) -> void { + if (!this->team_index) { + return; + } + try { + this->team_index->replace_all_from_authority(canonical_team_state); + config_log.info_f("Applied canonical TeamSync team state"); + } catch (const std::exception& e) { + config_log.warning_f("Failed to apply canonical TeamSync team state: {}", e.what()); + } + }); } void ServerState::load_patch_indexes() { diff --git a/src/TeamIndex.cc b/src/TeamIndex.cc index 4b7e11b7..d78654be 100644 --- a/src/TeamIndex.cc +++ b/src/TeamIndex.cc @@ -454,6 +454,102 @@ void TeamIndex::buy_reward(uint32_t team_id, const std::string& key, uint32_t po team->save_config(); } +void TeamIndex::replace_all_from_authority(const phosg::JSON& canonical_team_state) { + uint32_t new_next_team_id = canonical_team_state.get_int("next_team_id", 1); + if (new_next_team_id < 1) { + new_next_team_id = 1; + } + + std::unordered_map> new_id_to_team; + std::unordered_map> new_name_to_team; + std::unordered_map> new_account_id_to_team; + + const auto& teams_json = canonical_team_state.get("teams", phosg::JSON::list()).as_list(); + for (const auto& team_json_p : teams_json) { + const auto& team_json = *team_json_p; + uint32_t team_id = team_json.get_int("team_id"); + if (team_id == 0) { + throw std::runtime_error("authority team has invalid team_id"); + } + + auto team = std::make_shared(team_id); + team->name = team_json.get_string("name", ""); + if (team->name.empty()) { + throw std::runtime_error("authority team has empty name"); + } + + team->reward_flags = team_json.get_int("reward_flags", 0); + team->spent_points = team_json.get_int("spent_points", 0); + team->points = 0; + + team->reward_keys.clear(); + for (const auto& key_json_p : team_json.get("reward_keys", phosg::JSON::list()).as_list()) { + team->reward_keys.emplace(key_json_p->as_string()); + } + + const auto& members_json = team_json.get("members", phosg::JSON::list()).as_list(); + for (const auto& member_json_p : members_json) { + const auto& member_json = *member_json_p; + Team::Member m; + m.account_id = member_json.get_int("account_id", member_json.get_int("AccountID", 0)); + m.flags = member_json.get_int("flags", member_json.get_int("Flags", 0)); + m.points = member_json.get_int("points", member_json.get_int("Points", 0)); + m.name = member_json.get_string("name", member_json.get_string("Name", "")); + + if (m.account_id == 0) { + throw std::runtime_error("authority team member has invalid account_id"); + } + if (m.check_flag(Team::Member::Flag::IS_MASTER)) { + team->master_account_id = m.account_id; + } + team->points += m.points; + team->members.emplace(m.account_id, std::move(m)); + } + + if (team->members.empty()) { + throw std::runtime_error("authority team has no members"); + } + + if (!new_id_to_team.emplace(team->team_id, team).second) { + throw std::runtime_error("authority state has duplicate team_id"); + } + if (!new_name_to_team.emplace(team->name, team).second) { + throw std::runtime_error("authority state has duplicate team name"); + } + for (const auto& [account_id, member] : team->members) { + if (!new_account_id_to_team.emplace(account_id, team).second) { + throw std::runtime_error("authority state has account in multiple teams"); + } + } + } + + std::filesystem::create_directories(this->directory.c_str()); + + for (const auto& item : std::filesystem::directory_iterator(this->directory)) { + const std::string filename = item.path().filename().string(); + if ((filename != "base.json") && (filename.ends_with(".json") || filename.ends_with(".bmp"))) { + std::filesystem::remove(item.path()); + } + } + + phosg::save_file( + this->directory + "/base.json", + phosg::JSON::dict({{"NextTeamID", new_next_team_id}}).serialize( + phosg::JSON::SerializeOption::FORMAT | + phosg::JSON::SerializeOption::HEX_INTEGERS | + phosg::JSON::SerializeOption::ESCAPE_CONTROLS_ONLY)); + + this->next_team_id = new_next_team_id; + this->id_to_team.swap(new_id_to_team); + this->name_to_team.swap(new_name_to_team); + this->account_id_to_team.swap(new_account_id_to_team); + + for (const auto& [team_id, team] : this->id_to_team) { + team->save_config(); + } +} + + void TeamIndex::add_to_indexes(std::shared_ptr team) { if (!this->id_to_team.emplace(team->team_id, team).second) { throw std::runtime_error("team ID is already in use"); diff --git a/src/TeamIndex.hh b/src/TeamIndex.hh index eaab5c0c..a78b1233 100644 --- a/src/TeamIndex.hh +++ b/src/TeamIndex.hh @@ -144,6 +144,10 @@ public: void change_master(uint32_t master_account_id, uint32_t new_master_account_id); void buy_reward(uint32_t team_id, const std::string& key, uint32_t points, Team::RewardFlag reward_flag); + // Replaces all local BB team state with coordinator-authoritative state. + // This updates disk and in-memory indexes immediately. + void replace_all_from_authority(const phosg::JSON& canonical_team_state); + protected: std::string directory; uint32_t next_team_id; diff --git a/src/TeamSync.cc b/src/TeamSync.cc index c97409db..e57dce0d 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,9 @@ struct ExchangeState { static std::mutex exchange_state_mutex; static ExchangeState exchange_state; +static std::mutex canonical_team_state_callback_mutex; +static CanonicalTeamStateCallback canonical_team_state_callback; + static std::string source_label(const Config& cfg) { if (!cfg.source.empty()) { return cfg.source; @@ -431,6 +435,22 @@ bool relay_team_chat_enabled() { return cfg.enabled && cfg.relay_team_chat; } +void set_canonical_team_state_callback(CanonicalTeamStateCallback cb) { + std::lock_guard g(canonical_team_state_callback_mutex); + canonical_team_state_callback = std::move(cb); +} + +static void apply_canonical_team_state(const phosg::JSON& canonical_team_state) { + CanonicalTeamStateCallback cb; + { + std::lock_guard g(canonical_team_state_callback_mutex); + cb = canonical_team_state_callback; + } + if (cb) { + cb(canonical_team_state); + } +} + // This helper only builds empty event batches from trusted config fields. // Do not extend the format-string JSON path for player-controlled data; use a // real JSON object/serializer when team chat events are added. @@ -467,6 +487,11 @@ static asio::awaitable run_empty_exchange_once(const Config& cfg) { bool truncated = response.get_bool("truncated", false); size_t inbound_events = response.get("events", phosg::JSON::list()).as_list().size(); + const auto& canonical_team_state = response.get("canonical_team_state", phosg::JSON::dict()); + if (!canonical_team_state.as_dict().empty()) { + apply_canonical_team_state(canonical_team_state); + } + { std::lock_guard g(exchange_state_mutex); diff --git a/src/TeamSync.hh b/src/TeamSync.hh index 6dbfca2d..bb2d257b 100644 --- a/src/TeamSync.hh +++ b/src/TeamSync.hh @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -32,6 +33,9 @@ void configure_from_json(const phosg::JSON& json); bool enabled(); bool relay_team_chat_enabled(); +using CanonicalTeamStateCallback = std::function; +void set_canonical_team_state_callback(CanonicalTeamStateCallback cb); + void start_exchange_task(asio::io_context& io_context); } // namespace TeamSync From 7526176bb379009a7669a6709550801b9b912e9c Mon Sep 17 00:00:00 2001 From: James Osborne Date: Fri, 12 Jun 2026 01:02:13 -0400 Subject: [PATCH 06/11] Add TeamSync outbound team create queue --- src/ReceiveCommands.cc | 3 ++ src/TeamSync.cc | 97 ++++++++++++++++++++++++++++++++++++++---- src/TeamSync.hh | 3 ++ 3 files changed, 94 insertions(+), 9 deletions(-) diff --git a/src/ReceiveCommands.cc b/src/ReceiveCommands.cc index 326b95af..2f640f12 100644 --- a/src/ReceiveCommands.cc +++ b/src/ReceiveCommands.cc @@ -26,6 +26,7 @@ #include "SendCommands.hh" #include "StaticGameData.hh" #include "Text.hh" +#include "TeamSync.hh" #include "BrutalPeeps.hh" #include "AccountSync.hh" @@ -6087,6 +6088,8 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag c->login->account->bb_team_id = team->team_id; c->login->account->save(); + TeamSync::enqueue_team_create(team_name, c->login->account->account_id, player_name); + send_command(c, 0x02EA, 0x00000000); send_team_metadata_change_notifications(s, team, c->login->account->account_id, TeamMetadataChange::TEAM_CREATED); } diff --git a/src/TeamSync.cc b/src/TeamSync.cc index e57dce0d..838cc656 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -30,8 +31,20 @@ struct ExchangeState { uint64_t consecutive_failures = 0; }; +struct OutboundEvent { + uint64_t seq = 0; + std::string type; + + std::string team_name; + uint32_t creator_account_id = 0; + std::string creator_name; +}; + +static constexpr size_t MAX_OUTBOUND_EVENTS = 256; + static std::mutex exchange_state_mutex; static ExchangeState exchange_state; +static std::deque outbound_events; static std::mutex canonical_team_state_callback_mutex; static CanonicalTeamStateCallback canonical_team_state_callback; @@ -435,6 +448,37 @@ bool relay_team_chat_enabled() { return cfg.enabled && cfg.relay_team_chat; } +bool relay_team_actions_enabled() { + auto cfg = get_config(); + return cfg.enabled && cfg.relay_team_actions; +} + +void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name) { + auto cfg = get_config(); + if (!cfg.enabled || !cfg.relay_team_actions) { + return; + } + if (team_name.empty() || creator_account_id == 0 || creator_name.empty()) { + return; + } + + std::lock_guard g(exchange_state_mutex); + if (outbound_events.size() >= MAX_OUTBOUND_EVENTS) { + std::fprintf(stderr, + "[TeamSync] warning outbound_event_dropped reason=queue_full source=%s team_namespace=%s type=team_create\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str()); + return; + } + + OutboundEvent ev; + ev.type = "team_create"; + ev.team_name = team_name; + ev.creator_account_id = creator_account_id; + ev.creator_name = creator_name; + outbound_events.emplace_back(std::move(ev)); +} + void set_canonical_team_state_callback(CanonicalTeamStateCallback cb) { std::lock_guard g(canonical_team_state_callback_mutex); canonical_team_state_callback = std::move(cb); @@ -451,18 +495,47 @@ static void apply_canonical_team_state(const phosg::JSON& canonical_team_state) } } -// This helper only builds empty event batches from trusted config fields. -// Do not extend the format-string JSON path for player-controlled data; use a -// real JSON object/serializer when team chat events are added. -static std::string exchange_body_for_empty_events(const Config& cfg, const ExchangeState& state) { - if (state.bootstrapped) { +static std::string exchange_body_for_current_state(const Config& cfg) { + std::lock_guard g(exchange_state_mutex); + + std::string events_json = "[]"; + if (exchange_state.bootstrapped) { + std::string parts; + bool first = true; + + for (auto& ev : outbound_events) { + if (ev.seq == 0) { + ev.seq = exchange_state.next_seq++; + } + + if (!first) { + parts += ","; + } + first = false; + + if (ev.type == "team_create") { + parts += std::format( + "{{\"seq\":{},\"type\":\"team_create\",\"team_namespace\":\"{}\",\"creator_account_id\":{},\"creator_name\":\"{}\",\"team_name\":\"{}\"}}", + ev.seq, + json_escape(cfg.team_namespace), + ev.creator_account_id, + json_escape(ev.creator_name), + json_escape(ev.team_name)); + } + } + + events_json = "[" + parts + "]"; + } + + if (exchange_state.bootstrapped) { return std::format( - "{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":{},\"events\":[]}}", + "{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":{},\"events\":{}}}", json_escape(source_label(cfg)), json_escape(cfg.source_region), json_escape(cfg.source_ship), json_escape(cfg.team_namespace), - state.cursor); + exchange_state.cursor, + events_json); } return std::format( @@ -474,8 +547,7 @@ static std::string exchange_body_for_empty_events(const Config& cfg, const Excha } static asio::awaitable run_empty_exchange_once(const Config& cfg) { - auto before_state = get_exchange_state(); - std::string body = exchange_body_for_empty_events(cfg, before_state); + std::string body = exchange_body_for_current_state(cfg); phosg::JSON response = co_await post_json_with_timeout(cfg, "/team-sync/exchange", body); if (!response.get_bool("ok", false)) { @@ -506,6 +578,9 @@ static asio::awaitable run_empty_exchange_once(const Config& cfg) { static_cast(next_cursor), static_cast(exchange_state.next_seq), static_cast(ack_max_seq)); + for (auto& ev : outbound_events) { + ev.seq = 0; + } exchange_state = ExchangeState(); co_return; } @@ -521,6 +596,10 @@ static asio::awaitable run_empty_exchange_once(const Config& cfg) { static_cast(exchange_state.next_seq), static_cast(next_cursor)); } + while (!outbound_events.empty() && outbound_events.front().seq && (outbound_events.front().seq <= ack_max_seq)) { + outbound_events.pop_front(); + } + exchange_state.cursor = next_cursor; exchange_state.consecutive_failures = 0; } diff --git a/src/TeamSync.hh b/src/TeamSync.hh index bb2d257b..5686ff4e 100644 --- a/src/TeamSync.hh +++ b/src/TeamSync.hh @@ -32,6 +32,9 @@ void configure_from_json(const phosg::JSON& json); bool enabled(); bool relay_team_chat_enabled(); +bool relay_team_actions_enabled(); + +void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name); using CanonicalTeamStateCallback = std::function; void set_canonical_team_state_callback(CanonicalTeamStateCallback cb); From 44650179f0807ea31fb5246d69a8d454632e237a Mon Sep 17 00:00:00 2001 From: James Osborne Date: Fri, 12 Jun 2026 01:07:12 -0400 Subject: [PATCH 07/11] Route TeamSync team creation through coordinator --- src/ReceiveCommands.cc | 26 +++++++++++++++++++++++--- src/TeamSync.cc | 40 ++++++++++++++++++++++++++++++++++++---- src/TeamSync.hh | 3 ++- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/ReceiveCommands.cc b/src/ReceiveCommands.cc index 2f640f12..4eda4ce4 100644 --- a/src/ReceiveCommands.cc +++ b/src/ReceiveCommands.cc @@ -6084,12 +6084,32 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag send_command(c, 0x02EA, 0x00000001); } else { std::string player_name = c->character_file()->disp.visual.name.decode(c->language()); - auto team = s->team_index->create(team_name, c->login->account->account_id, player_name); + std::shared_ptr team; + + if (TeamSync::relay_team_actions_enabled()) { + if (!TeamSync::enqueue_team_create(team_name, c->login->account->account_id, player_name)) { + send_command(c, 0x02EA, 0x00000001); + break; + } + + if (!co_await TeamSync::exchange_once_now()) { + send_command(c, 0x02EA, 0x00000001); + break; + } + + team = s->team_index->get_by_account_id(c->login->account->account_id); + if (!team || (team->name != team_name)) { + send_command(c, 0x02EA, 0x00000001); + break; + } + + } else { + team = s->team_index->create(team_name, c->login->account->account_id, player_name); + } + c->login->account->bb_team_id = team->team_id; c->login->account->save(); - TeamSync::enqueue_team_create(team_name, c->login->account->account_id, player_name); - send_command(c, 0x02EA, 0x00000000); send_team_metadata_change_notifications(s, team, c->login->account->account_id, TeamMetadataChange::TEAM_CREATED); } diff --git a/src/TeamSync.cc b/src/TeamSync.cc index 838cc656..f96126a7 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -453,13 +453,13 @@ bool relay_team_actions_enabled() { return cfg.enabled && cfg.relay_team_actions; } -void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name) { +bool enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name) { auto cfg = get_config(); if (!cfg.enabled || !cfg.relay_team_actions) { - return; + return false; } if (team_name.empty() || creator_account_id == 0 || creator_name.empty()) { - return; + return false; } std::lock_guard g(exchange_state_mutex); @@ -468,7 +468,7 @@ void enqueue_team_create(const std::string& team_name, uint32_t creator_account_ "[TeamSync] warning outbound_event_dropped reason=queue_full source=%s team_namespace=%s type=team_create\n", source_label(cfg).c_str(), cfg.team_namespace.c_str()); - return; + return false; } OutboundEvent ev; @@ -477,6 +477,7 @@ void enqueue_team_create(const std::string& team_name, uint32_t creator_account_ ev.creator_account_id = creator_account_id; ev.creator_name = creator_name; outbound_events.emplace_back(std::move(ev)); + return true; } void set_canonical_team_state_callback(CanonicalTeamStateCallback cb) { @@ -617,6 +618,37 @@ static asio::awaitable run_empty_exchange_once(const Config& cfg) { co_return; } + +asio::awaitable exchange_once_now() { + auto cfg = get_config(); + if (!exchange_enabled(cfg) || cfg.coordinator_url.empty()) { + co_return false; + } + + try { + bool was_bootstrapped = get_exchange_state().bootstrapped; + co_await run_empty_exchange_once(cfg); + + // First exchange after startup only bootstraps cursor/seq. Run once more so + // newly queued team_create events are actually sent before the client gets + // success. + if (!was_bootstrapped) { + co_await run_empty_exchange_once(cfg); + } + + co_return true; + + } catch (const std::exception& e) { + std::fprintf(stderr, + "[TeamSync] warning exchange_once_failed source=%s team_namespace=%s error=%s\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str(), + e.what()); + co_return false; + } +} + + static asio::awaitable exchange_task() { for (;;) { auto cfg = get_config(); diff --git a/src/TeamSync.hh b/src/TeamSync.hh index 5686ff4e..ee3d33d8 100644 --- a/src/TeamSync.hh +++ b/src/TeamSync.hh @@ -34,7 +34,8 @@ bool enabled(); bool relay_team_chat_enabled(); bool relay_team_actions_enabled(); -void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name); +bool enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name); +asio::awaitable exchange_once_now(); using CanonicalTeamStateCallback = std::function; void set_canonical_team_state_callback(CanonicalTeamStateCallback cb); From b578c1cbbe11836e83581d6a0ade4dd0535a828c Mon Sep 17 00:00:00 2001 From: James Osborne Date: Fri, 12 Jun 2026 01:23:59 -0400 Subject: [PATCH 08/11] Apply TeamSync team membership to account records --- src/ServerState.cc | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/ServerState.cc b/src/ServerState.cc index b8dedf6d..c66fdb42 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -1650,12 +1650,48 @@ void ServerState::load_teams() { this->team_index = std::make_shared("system/teams", this->team_reward_defs_json); TeamSync::set_canonical_team_state_callback([this](const phosg::JSON& canonical_team_state) -> void { - if (!this->team_index) { + if (!this->team_index || !this->account_index) { return; } + try { this->team_index->replace_all_from_authority(canonical_team_state); - config_log.info_f("Applied canonical TeamSync team state"); + + std::unordered_map account_id_to_team_id; + const auto& teams_json = canonical_team_state.get("teams", phosg::JSON::list()).as_list(); + for (const auto& team_json_p : teams_json) { + const auto& team_json = *team_json_p; + uint32_t team_id = team_json.get_int("team_id", 0); + if (!team_id) { + continue; + } + + const auto& members_json = team_json.get("members", phosg::JSON::list()).as_list(); + for (const auto& member_json_p : members_json) { + const auto& member_json = *member_json_p; + uint32_t account_id = member_json.get_int("account_id", member_json.get_int("AccountID", 0)); + if (account_id) { + account_id_to_team_id.emplace(account_id, team_id); + } + } + } + + size_t account_updates = 0; + for (const auto& account : this->account_index->all()) { + auto it = account_id_to_team_id.find(account->account_id); + uint32_t authority_team_id = (it == account_id_to_team_id.end()) ? 0 : it->second; + if (account->bb_team_id != authority_team_id) { + account->bb_team_id = authority_team_id; + account->save(); + account_updates++; + } + } + + config_log.info_f( + "Applied canonical TeamSync team state (teams={}, account_updates={})", + teams_json.size(), + account_updates); + } catch (const std::exception& e) { config_log.warning_f("Failed to apply canonical TeamSync team state: {}", e.what()); } From 21bceac1e3e687926d1069883f7d31dc666b4a09 Mon Sep 17 00:00:00 2001 From: James Osborne Date: Fri, 12 Jun 2026 01:26:18 -0400 Subject: [PATCH 09/11] Route TeamSync member mutations through coordinator --- src/ReceiveCommands.cc | 65 +++++++++++++++++++++++++++++++------- src/TeamSync.cc | 72 ++++++++++++++++++++++++++++++++++++++++++ src/TeamSync.hh | 2 ++ 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/src/ReceiveCommands.cc b/src/ReceiveCommands.cc index 4eda4ce4..87e233a8 100644 --- a/src/ReceiveCommands.cc +++ b/src/ReceiveCommands.cc @@ -6139,16 +6139,39 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag send_command(added_c, 0x04EA, 0x00000005); } else { - added_c->login->account->bb_team_id = team->team_id; - added_c->login->account->save(); - s->team_index->add_member( - team->team_id, - added_c->login->account->account_id, - added_c->character_file()->disp.visual.name.decode(added_c->language())); + const uint32_t added_account_id = added_c->login->account->account_id; + const std::string added_name = added_c->character_file()->disp.visual.name.decode(added_c->language()); + + if (TeamSync::relay_team_actions_enabled()) { + if (!TeamSync::enqueue_team_member_add(team->team_id, added_account_id, added_name)) { + send_command(c, 0x04EA, 0x00000006); + send_command(added_c, 0x04EA, 0x00000006); + break; + } + + if (!co_await TeamSync::exchange_once_now()) { + send_command(c, 0x04EA, 0x00000006); + send_command(added_c, 0x04EA, 0x00000006); + break; + } + + auto refreshed_added_team = s->team_index->get_by_account_id(added_account_id); + if (!refreshed_added_team || refreshed_added_team->team_id != team->team_id) { + send_command(c, 0x04EA, 0x00000006); + send_command(added_c, 0x04EA, 0x00000006); + break; + } + + } else { + added_c->login->account->bb_team_id = team->team_id; + added_c->login->account->save(); + s->team_index->add_member(team->team_id, added_account_id, added_name); + } + send_command(c, 0x04EA, 0x00000000); send_command(added_c, 0x04EA, 0x00000000); send_team_metadata_change_notifications( - s, team, added_c->login->account->account_id, TeamMetadataChange::TEAM_MEMBER_COUNT); + s, team, added_account_id, TeamMetadataChange::TEAM_MEMBER_COUNT); } } } @@ -6161,10 +6184,30 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag bool is_removing_self = (cmd.guild_card_number == c->login->account->account_id); if (is_removing_self || (team->members.at(c->login->account->account_id).privilege_level() >= 0x30)) { - s->team_index->remove_member(cmd.guild_card_number); - auto removed_account = s->account_index->from_account_id(cmd.guild_card_number); - removed_account->bb_team_id = 0; - removed_account->save(); + if (TeamSync::relay_team_actions_enabled()) { + if (!TeamSync::enqueue_team_member_remove(cmd.guild_card_number)) { + send_command(c, 0x06EA, 0x00000001); + break; + } + + if (!co_await TeamSync::exchange_once_now()) { + send_command(c, 0x06EA, 0x00000001); + break; + } + + auto removed_account = s->account_index->from_account_id(cmd.guild_card_number); + if (removed_account->bb_team_id != 0) { + send_command(c, 0x06EA, 0x00000001); + break; + } + + } else { + s->team_index->remove_member(cmd.guild_card_number); + auto removed_account = s->account_index->from_account_id(cmd.guild_card_number); + removed_account->bb_team_id = 0; + removed_account->save(); + } + send_command(c, 0x06EA, 0x00000000); std::shared_ptr removed_c; diff --git a/src/TeamSync.cc b/src/TeamSync.cc index f96126a7..821fc4a4 100644 --- a/src/TeamSync.cc +++ b/src/TeamSync.cc @@ -35,6 +35,10 @@ struct OutboundEvent { uint64_t seq = 0; std::string type; + uint32_t team_id = 0; + uint32_t account_id = 0; + std::string name; + std::string team_name; uint32_t creator_account_id = 0; std::string creator_name; @@ -480,6 +484,58 @@ bool enqueue_team_create(const std::string& team_name, uint32_t creator_account_ return true; } +bool enqueue_team_member_add(uint32_t team_id, uint32_t account_id, const std::string& name) { + auto cfg = get_config(); + if (!cfg.enabled || !cfg.relay_team_actions) { + return false; + } + if (team_id == 0 || account_id == 0 || name.empty()) { + return false; + } + + std::lock_guard g(exchange_state_mutex); + if (outbound_events.size() >= MAX_OUTBOUND_EVENTS) { + std::fprintf(stderr, + "[TeamSync] warning outbound_event_dropped reason=queue_full source=%s team_namespace=%s type=team_member_add\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str()); + return false; + } + + OutboundEvent ev; + ev.type = "team_member_add"; + ev.team_id = team_id; + ev.account_id = account_id; + ev.name = name; + outbound_events.emplace_back(std::move(ev)); + return true; +} + +bool enqueue_team_member_remove(uint32_t account_id) { + auto cfg = get_config(); + if (!cfg.enabled || !cfg.relay_team_actions) { + return false; + } + if (account_id == 0) { + return false; + } + + std::lock_guard g(exchange_state_mutex); + if (outbound_events.size() >= MAX_OUTBOUND_EVENTS) { + std::fprintf(stderr, + "[TeamSync] warning outbound_event_dropped reason=queue_full source=%s team_namespace=%s type=team_member_remove\n", + source_label(cfg).c_str(), + cfg.team_namespace.c_str()); + return false; + } + + OutboundEvent ev; + ev.type = "team_member_remove"; + ev.account_id = account_id; + outbound_events.emplace_back(std::move(ev)); + return true; +} + void set_canonical_team_state_callback(CanonicalTeamStateCallback cb) { std::lock_guard g(canonical_team_state_callback_mutex); canonical_team_state_callback = std::move(cb); @@ -522,6 +578,22 @@ static std::string exchange_body_for_current_state(const Config& cfg) { ev.creator_account_id, json_escape(ev.creator_name), json_escape(ev.team_name)); + + } else if (ev.type == "team_member_add") { + parts += std::format( + "{{\"seq\":{},\"type\":\"team_member_add\",\"team_namespace\":\"{}\",\"team_id\":{},\"account_id\":{},\"name\":\"{}\"}}", + ev.seq, + json_escape(cfg.team_namespace), + ev.team_id, + ev.account_id, + json_escape(ev.name)); + + } else if (ev.type == "team_member_remove") { + parts += std::format( + "{{\"seq\":{},\"type\":\"team_member_remove\",\"team_namespace\":\"{}\",\"account_id\":{}}}", + ev.seq, + json_escape(cfg.team_namespace), + ev.account_id); } } diff --git a/src/TeamSync.hh b/src/TeamSync.hh index ee3d33d8..be9a47e2 100644 --- a/src/TeamSync.hh +++ b/src/TeamSync.hh @@ -35,6 +35,8 @@ bool relay_team_chat_enabled(); bool relay_team_actions_enabled(); bool enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name); +bool enqueue_team_member_add(uint32_t team_id, uint32_t account_id, const std::string& name); +bool enqueue_team_member_remove(uint32_t account_id); asio::awaitable exchange_once_now(); using CanonicalTeamStateCallback = std::function; From cf380e93d27467cc3719cfcf8a7dfa2a37b93f12 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 12 Jun 2026 13:02:06 -0400 Subject: [PATCH 10/11] Guard unsupported TeamSync team mutations --- src/ReceiveCommands.cc | 28 ++++++++++++++++++++++++++++ src/ReceiveSubcommands.cc | 7 +++++++ 2 files changed, 35 insertions(+) diff --git a/src/ReceiveCommands.cc b/src/ReceiveCommands.cc index 87e233a8..d027757e 100644 --- a/src/ReceiveCommands.cc +++ b/src/ReceiveCommands.cc @@ -6262,6 +6262,11 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag case 0x0FEA: { // Set team flag auto team = c->team(); if (team && team->members.at(c->login->account->account_id).check_flag(TeamIndex::Team::Member::Flag::IS_MASTER)) { + if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Do not mutate local-only team + // metadata that would be erased by the next authority apply. + break; + } const auto& cmd = check_size_t(msg.data); s->team_index->set_flag_data(team->team_id, cmd.flag_data); send_team_metadata_change_notifications(s, team, 0, TeamMetadataChange::FLAG_DATA); @@ -6271,6 +6276,12 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag case 0x10EA: { // Disband team auto team = c->team(); if (team && team->members.at(c->login->account->account_id).check_flag(TeamIndex::Team::Member::Flag::IS_MASTER)) { + if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Disband is not yet routed + // through the authority. + send_command(c, 0x10EA, 0x00000001); + break; + } s->team_index->disband(team->team_id); send_command(c, 0x10EA, 0x00000000); send_team_metadata_change_notifications(s, team, 0, TeamMetadataChange::TEAM_DISBANDED); @@ -6285,6 +6296,13 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag throw std::runtime_error("this command cannot be used to modify your own permissions"); } + if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Privilege/master changes are + // not yet routed through the authority. + send_command(c, 0x11EA, 0x00000005); + break; + } + // The client only sends this command with flag = 0x00, 0x30, or 0x40 switch (msg.flag) { case 0x00: // Demote member @@ -6342,6 +6360,12 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag throw std::runtime_error("team reward already purchased"); } + if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Rewards/points are not yet + // routed through the authority. + break; + } + s->team_index->buy_reward(team->team_id, reward.key, reward.team_points, reward.reward_flag); if (reward.reward_flag != TeamIndex::Team::RewardFlag::NONE) { @@ -6366,6 +6390,10 @@ static asio::awaitable on_EA_BB(std::shared_ptr c, Channel::Messag send_command(c, 0x1FEA, 0x00000001); } else if (s->team_index->get_by_name(new_team_name)) { send_command(c, 0x1FEA, 0x00000002); + } else if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Rename is not yet routed + // through the authority. + send_command(c, 0x1FEA, 0x00000001); } else { s->team_index->rename(team->team_id, new_team_name); send_command(c, 0x1FEA, 0x00000000); diff --git a/src/ReceiveSubcommands.cc b/src/ReceiveSubcommands.cc index 4ceb89a7..d2cdd4b1 100644 --- a/src/ReceiveSubcommands.cc +++ b/src/ReceiveSubcommands.cc @@ -22,6 +22,7 @@ #include "SendCommands.hh" #include "StaticGameData.hh" #include "Text.hh" +#include "TeamSync.hh" // The functions in this file are called when a client sends a game command (60, 62, 6C, 6D, C9, or CB). @@ -4785,6 +4786,12 @@ static void on_exchange_item_for_team_points_bb(std::shared_ptr c, Subco } auto s = c->require_server_state(); + if (TeamSync::relay_team_actions_enabled()) { + // TeamSync phase 1 is membership-only. Team points are not yet routed + // through the authority, so do not consume the item locally. + return; + } + auto p = c->character_file(); const auto& limits = *s->item_stack_limits(c->version()); auto item = p->remove_item(cmd.item_id, cmd.amount, limits); From 23015614edd6f0273a3bf6cc787673ac7842acd0 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 12 Jun 2026 14:39:07 -0400 Subject: [PATCH 11/11] Harden TeamSync canonical state application --- src/ServerState.cc | 60 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/src/ServerState.cc b/src/ServerState.cc index c66fdb42..18a22f3b 100644 --- a/src/ServerState.cc +++ b/src/ServerState.cc @@ -1649,33 +1649,82 @@ void ServerState::load_teams() { config_log.info_f("Indexing teams"); this->team_index = std::make_shared("system/teams", this->team_reward_defs_json); - TeamSync::set_canonical_team_state_callback([this](const phosg::JSON& canonical_team_state) -> void { + auto last_applied_team_sync_signature = std::make_shared(); + auto last_rejected_empty_team_sync_signature = std::make_shared(); + + TeamSync::set_canonical_team_state_callback([this, last_applied_team_sync_signature, last_rejected_empty_team_sync_signature]( + const phosg::JSON& canonical_team_state) -> void { if (!this->team_index || !this->account_index) { return; } try { - this->team_index->replace_all_from_authority(canonical_team_state); + const auto& teams_json = canonical_team_state.get("teams", phosg::JSON::list()).as_list(); + + std::string canonical_signature; + canonical_signature += "next_team_id=" + + std::to_string(canonical_team_state.get_int("next_team_id", canonical_team_state.get_int("NextTeamID", 0))) + "\n"; + canonical_signature += "teams=" + std::to_string(teams_json.size()) + "\n"; std::unordered_map account_id_to_team_id; - const auto& teams_json = canonical_team_state.get("teams", phosg::JSON::list()).as_list(); for (const auto& team_json_p : teams_json) { const auto& team_json = *team_json_p; - uint32_t team_id = team_json.get_int("team_id", 0); + uint32_t team_id = team_json.get_int("team_id", team_json.get_int("TeamID", 0)); if (!team_id) { continue; } + canonical_signature += "team=" + std::to_string(team_id) + "\n"; + const auto& members_json = team_json.get("members", phosg::JSON::list()).as_list(); for (const auto& member_json_p : members_json) { const auto& member_json = *member_json_p; uint32_t account_id = member_json.get_int("account_id", member_json.get_int("AccountID", 0)); + uint32_t flags = member_json.get_int("flags", member_json.get_int("Flags", 0)); + uint32_t points = member_json.get_int("points", member_json.get_int("Points", 0)); if (account_id) { account_id_to_team_id.emplace(account_id, team_id); + canonical_signature += "member=" + std::to_string(account_id) + ":" + + std::to_string(flags) + ":" + std::to_string(points) + "\n"; } } } + const size_t local_team_count = this->team_index->all().size(); + + size_t local_account_team_refs = 0; + for (const auto& account : this->account_index->all()) { + if (account->bb_team_id) { + local_account_team_refs++; + } + } + + if (teams_json.empty()) { + if (local_team_count || local_account_team_refs) { + std::string rejection_signature = canonical_signature + + "local_team_count=" + std::to_string(local_team_count) + + ";local_account_team_refs=" + std::to_string(local_account_team_refs); + if (*last_rejected_empty_team_sync_signature != rejection_signature) { + config_log.warning_f( + "Ignored empty canonical TeamSync team state because local state is non-empty " + "(local_teams={}, local_account_team_refs={})", + local_team_count, + local_account_team_refs); + *last_rejected_empty_team_sync_signature = std::move(rejection_signature); + } + return; + } + + *last_applied_team_sync_signature = std::move(canonical_signature); + return; + } + + if (*last_applied_team_sync_signature == canonical_signature) { + return; + } + + this->team_index->replace_all_from_authority(canonical_team_state); + size_t account_updates = 0; for (const auto& account : this->account_index->all()) { auto it = account_id_to_team_id.find(account->account_id); @@ -1687,6 +1736,9 @@ void ServerState::load_teams() { } } + *last_applied_team_sync_signature = std::move(canonical_signature); + last_rejected_empty_team_sync_signature->clear(); + config_log.info_f( "Applied canonical TeamSync team state (teams={}, account_updates={})", teams_json.size(),