Add TeamSync outbound team create queue
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include "SendCommands.hh"
|
||||
#include "StaticGameData.hh"
|
||||
#include "Text.hh"
|
||||
#include "TeamSync.hh"
|
||||
#include "BrutalPeeps.hh"
|
||||
#include "AccountSync.hh"
|
||||
|
||||
@@ -6087,6 +6088,8 @@ static asio::awaitable<void> on_EA_BB(std::shared_ptr<Client> c, Channel::Messag
|
||||
c->login->account->bb_team_id = team->team_id;
|
||||
c->login->account->save();
|
||||
|
||||
TeamSync::enqueue_team_create(team_name, c->login->account->account_id, player_name);
|
||||
|
||||
send_command(c, 0x02EA, 0x00000000);
|
||||
send_team_metadata_change_notifications(s, team, c->login->account->account_id, TeamMetadataChange::TEAM_CREATED);
|
||||
}
|
||||
|
||||
+88
-9
@@ -5,6 +5,7 @@
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <cctype>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
@@ -30,8 +31,20 @@ struct ExchangeState {
|
||||
uint64_t consecutive_failures = 0;
|
||||
};
|
||||
|
||||
struct OutboundEvent {
|
||||
uint64_t seq = 0;
|
||||
std::string type;
|
||||
|
||||
std::string team_name;
|
||||
uint32_t creator_account_id = 0;
|
||||
std::string creator_name;
|
||||
};
|
||||
|
||||
static constexpr size_t MAX_OUTBOUND_EVENTS = 256;
|
||||
|
||||
static std::mutex exchange_state_mutex;
|
||||
static ExchangeState exchange_state;
|
||||
static std::deque<OutboundEvent> outbound_events;
|
||||
|
||||
static std::mutex canonical_team_state_callback_mutex;
|
||||
static CanonicalTeamStateCallback canonical_team_state_callback;
|
||||
@@ -435,6 +448,37 @@ bool relay_team_chat_enabled() {
|
||||
return cfg.enabled && cfg.relay_team_chat;
|
||||
}
|
||||
|
||||
bool relay_team_actions_enabled() {
|
||||
auto cfg = get_config();
|
||||
return cfg.enabled && cfg.relay_team_actions;
|
||||
}
|
||||
|
||||
void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name) {
|
||||
auto cfg = get_config();
|
||||
if (!cfg.enabled || !cfg.relay_team_actions) {
|
||||
return;
|
||||
}
|
||||
if (team_name.empty() || creator_account_id == 0 || creator_name.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> g(exchange_state_mutex);
|
||||
if (outbound_events.size() >= MAX_OUTBOUND_EVENTS) {
|
||||
std::fprintf(stderr,
|
||||
"[TeamSync] warning outbound_event_dropped reason=queue_full source=%s team_namespace=%s type=team_create\n",
|
||||
source_label(cfg).c_str(),
|
||||
cfg.team_namespace.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
OutboundEvent ev;
|
||||
ev.type = "team_create";
|
||||
ev.team_name = team_name;
|
||||
ev.creator_account_id = creator_account_id;
|
||||
ev.creator_name = creator_name;
|
||||
outbound_events.emplace_back(std::move(ev));
|
||||
}
|
||||
|
||||
void set_canonical_team_state_callback(CanonicalTeamStateCallback cb) {
|
||||
std::lock_guard<std::mutex> g(canonical_team_state_callback_mutex);
|
||||
canonical_team_state_callback = std::move(cb);
|
||||
@@ -451,18 +495,47 @@ static void apply_canonical_team_state(const phosg::JSON& canonical_team_state)
|
||||
}
|
||||
}
|
||||
|
||||
// This helper only builds empty event batches from trusted config fields.
|
||||
// Do not extend the format-string JSON path for player-controlled data; use a
|
||||
// real JSON object/serializer when team chat events are added.
|
||||
static std::string exchange_body_for_empty_events(const Config& cfg, const ExchangeState& state) {
|
||||
if (state.bootstrapped) {
|
||||
static std::string exchange_body_for_current_state(const Config& cfg) {
|
||||
std::lock_guard<std::mutex> g(exchange_state_mutex);
|
||||
|
||||
std::string events_json = "[]";
|
||||
if (exchange_state.bootstrapped) {
|
||||
std::string parts;
|
||||
bool first = true;
|
||||
|
||||
for (auto& ev : outbound_events) {
|
||||
if (ev.seq == 0) {
|
||||
ev.seq = exchange_state.next_seq++;
|
||||
}
|
||||
|
||||
if (!first) {
|
||||
parts += ",";
|
||||
}
|
||||
first = false;
|
||||
|
||||
if (ev.type == "team_create") {
|
||||
parts += std::format(
|
||||
"{{\"seq\":{},\"type\":\"team_create\",\"team_namespace\":\"{}\",\"creator_account_id\":{},\"creator_name\":\"{}\",\"team_name\":\"{}\"}}",
|
||||
ev.seq,
|
||||
json_escape(cfg.team_namespace),
|
||||
ev.creator_account_id,
|
||||
json_escape(ev.creator_name),
|
||||
json_escape(ev.team_name));
|
||||
}
|
||||
}
|
||||
|
||||
events_json = "[" + parts + "]";
|
||||
}
|
||||
|
||||
if (exchange_state.bootstrapped) {
|
||||
return std::format(
|
||||
"{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":{},\"events\":[]}}",
|
||||
"{{\"source\":\"{}\",\"source_region\":\"{}\",\"source_ship\":\"{}\",\"team_namespace\":\"{}\",\"cursor\":{},\"events\":{}}}",
|
||||
json_escape(source_label(cfg)),
|
||||
json_escape(cfg.source_region),
|
||||
json_escape(cfg.source_ship),
|
||||
json_escape(cfg.team_namespace),
|
||||
state.cursor);
|
||||
exchange_state.cursor,
|
||||
events_json);
|
||||
}
|
||||
|
||||
return std::format(
|
||||
@@ -474,8 +547,7 @@ static std::string exchange_body_for_empty_events(const Config& cfg, const Excha
|
||||
}
|
||||
|
||||
static asio::awaitable<void> run_empty_exchange_once(const Config& cfg) {
|
||||
auto before_state = get_exchange_state();
|
||||
std::string body = exchange_body_for_empty_events(cfg, before_state);
|
||||
std::string body = exchange_body_for_current_state(cfg);
|
||||
phosg::JSON response = co_await post_json_with_timeout(cfg, "/team-sync/exchange", body);
|
||||
|
||||
if (!response.get_bool("ok", false)) {
|
||||
@@ -506,6 +578,9 @@ static asio::awaitable<void> run_empty_exchange_once(const Config& cfg) {
|
||||
static_cast<unsigned long long>(next_cursor),
|
||||
static_cast<unsigned long long>(exchange_state.next_seq),
|
||||
static_cast<unsigned long long>(ack_max_seq));
|
||||
for (auto& ev : outbound_events) {
|
||||
ev.seq = 0;
|
||||
}
|
||||
exchange_state = ExchangeState();
|
||||
co_return;
|
||||
}
|
||||
@@ -521,6 +596,10 @@ static asio::awaitable<void> run_empty_exchange_once(const Config& cfg) {
|
||||
static_cast<unsigned long long>(exchange_state.next_seq),
|
||||
static_cast<unsigned long long>(next_cursor));
|
||||
}
|
||||
while (!outbound_events.empty() && outbound_events.front().seq && (outbound_events.front().seq <= ack_max_seq)) {
|
||||
outbound_events.pop_front();
|
||||
}
|
||||
|
||||
exchange_state.cursor = next_cursor;
|
||||
exchange_state.consecutive_failures = 0;
|
||||
}
|
||||
|
||||
@@ -32,6 +32,9 @@ void configure_from_json(const phosg::JSON& json);
|
||||
|
||||
bool enabled();
|
||||
bool relay_team_chat_enabled();
|
||||
bool relay_team_actions_enabled();
|
||||
|
||||
void enqueue_team_create(const std::string& team_name, uint32_t creator_account_id, const std::string& creator_name);
|
||||
|
||||
using CanonicalTeamStateCallback = std::function<void(const phosg::JSON&)>;
|
||||
void set_canonical_team_state_callback(CanonicalTeamStateCallback cb);
|
||||
|
||||
Reference in New Issue
Block a user