work around data race during game join
This commit is contained in:
+73
-25
@@ -132,6 +132,62 @@ static void on_forward_check_game(shared_ptr<Client> c, uint8_t command, uint8_t
|
||||
forward_subcommand(c, command, flag, data, size);
|
||||
}
|
||||
|
||||
template <typename CmdT>
|
||||
static void send_or_enqueue_joining_player_command(shared_ptr<Client> c, uint8_t command, uint8_t flag, const CmdT& data) {
|
||||
if (c->game_join_command_queue) {
|
||||
c->log.info("Client not ready to receive join commands; adding to queue");
|
||||
auto cmd = c->game_join_command_queue->emplace_back();
|
||||
cmd.command = command;
|
||||
cmd.flag = flag;
|
||||
cmd.data.assign(reinterpret_cast<const char*>(&data), sizeof(data));
|
||||
} else {
|
||||
send_command(c, command, flag, &data, sizeof(data));
|
||||
}
|
||||
}
|
||||
|
||||
static void send_or_enqueue_joining_player_command(shared_ptr<Client> c, uint8_t command, uint8_t flag, string&& data) {
|
||||
if (c->game_join_command_queue) {
|
||||
c->log.info("Client not ready to receive join commands; adding to queue");
|
||||
auto cmd = c->game_join_command_queue->emplace_back();
|
||||
cmd.command = command;
|
||||
cmd.flag = flag;
|
||||
cmd.data = std::move(data);
|
||||
} else {
|
||||
send_command(c, command, flag, data.data(), data.size());
|
||||
}
|
||||
}
|
||||
|
||||
static void send_or_enqueue_joining_player_command(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
if (c->game_join_command_queue) {
|
||||
c->log.info("Client not ready to receive join commands; adding to queue");
|
||||
auto cmd = c->game_join_command_queue->emplace_back();
|
||||
cmd.command = command;
|
||||
cmd.flag = flag;
|
||||
cmd.data.assign(reinterpret_cast<const char*>(data), size);
|
||||
} else {
|
||||
send_command(c, command, flag, data, size);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_forward_check_game_loading(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
auto l = c->require_lobby();
|
||||
if (!l->is_game() || !l->any_client_loading()) {
|
||||
return;
|
||||
}
|
||||
if (command_is_private(command)) {
|
||||
auto target = l->clients.at(flag);
|
||||
if (target) {
|
||||
send_or_enqueue_joining_player_command(target, command, flag, data, size);
|
||||
}
|
||||
} else {
|
||||
for (auto lc : l->clients) {
|
||||
if (lc) {
|
||||
send_or_enqueue_joining_player_command(lc, command, flag, data, size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void on_forward_sync_joining_player_state(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
auto l = c->require_lobby();
|
||||
if (!l->is_game() || !l->any_client_loading()) {
|
||||
@@ -150,7 +206,7 @@ static void on_forward_sync_joining_player_state(shared_ptr<Client> c, uint8_t c
|
||||
print_data(stderr, decompressed);
|
||||
}
|
||||
|
||||
forward_subcommand(c, command, flag, data, size);
|
||||
on_forward_check_game_loading(c, command, flag, data, size);
|
||||
}
|
||||
|
||||
static void on_sync_joining_player_item_state(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
@@ -164,26 +220,26 @@ static void on_sync_joining_player_item_state(shared_ptr<Client> c, uint8_t comm
|
||||
if (!command_is_private(command)) {
|
||||
throw runtime_error("6x6D sent via public command");
|
||||
}
|
||||
if (flag >= l->max_clients) {
|
||||
return;
|
||||
}
|
||||
auto target = l->clients[flag];
|
||||
if (!target) {
|
||||
return;
|
||||
}
|
||||
|
||||
// For non-V3 versions, just forward the data verbatim. For V3, we need to
|
||||
// byteswap mags' data2 fields if exactly one of the sender and recipient is
|
||||
// PSO GC
|
||||
bool sender_is_gc = (c->version() == GameVersion::GC);
|
||||
if (!sender_is_gc && (c->version() != GameVersion::XB)) {
|
||||
forward_subcommand(c, command, flag, data, size);
|
||||
send_or_enqueue_joining_player_command(target, command, flag, data, size);
|
||||
|
||||
} else {
|
||||
if (flag >= l->max_clients) {
|
||||
return;
|
||||
}
|
||||
auto target = l->clients[flag];
|
||||
if (!target) {
|
||||
return;
|
||||
}
|
||||
bool target_is_gc = (target->version() == GameVersion::GC);
|
||||
|
||||
if (target_is_gc == sender_is_gc) {
|
||||
send_command(target, command, flag, data, size);
|
||||
send_or_enqueue_joining_player_command(target, command, flag, data, size);
|
||||
|
||||
} else {
|
||||
const auto& cmd = check_size_t<G_SyncGameStateHeader_6x6B_6x6C_6x6D_6x6E>(data, size, 0xFFFF);
|
||||
@@ -239,10 +295,10 @@ static void on_sync_joining_player_item_state(shared_ptr<Client> c, uint8_t comm
|
||||
c->log.info("Byteswapped and recompressed item sync data (%zX bytes)", out_compressed_data.size());
|
||||
}
|
||||
|
||||
vector<pair<const void*, size_t>> blocks;
|
||||
blocks.emplace_back(make_pair(&out_cmd, sizeof(out_cmd)));
|
||||
blocks.emplace_back(make_pair(out_compressed_data.data(), out_compressed_data.size()));
|
||||
send_command(target, command, flag, blocks);
|
||||
StringWriter w;
|
||||
w.write(&out_cmd, sizeof(out_cmd));
|
||||
w.write(out_compressed_data);
|
||||
send_or_enqueue_joining_player_command(target, command, flag, std::move(w.str()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -284,7 +340,7 @@ static void on_sync_joining_player_disp_and_inventory(
|
||||
bool sender_is_gc = (c->version() == GameVersion::GC);
|
||||
bool target_is_gc = (target->version() == GameVersion::GC);
|
||||
if (target_is_gc == sender_is_gc) {
|
||||
send_command(target, command, flag, data, size);
|
||||
send_or_enqueue_joining_player_command(target, command, flag, data, size);
|
||||
|
||||
} else if (sender_is_gc) {
|
||||
// Convert GC command to XB command
|
||||
@@ -300,7 +356,7 @@ static void on_sync_joining_player_disp_and_inventory(
|
||||
out_cmd.inventory.items[z].data.decode_for_version(GameVersion::GC);
|
||||
out_cmd.inventory.items[z].data.encode_for_version(GameVersion::XB, s->item_parameter_table_for_version(GameVersion::XB));
|
||||
}
|
||||
send_command_t(target, command, flag, out_cmd);
|
||||
send_or_enqueue_joining_player_command(target, command, flag, out_cmd);
|
||||
|
||||
} else {
|
||||
// Convert XB command to GC command
|
||||
@@ -312,18 +368,10 @@ static void on_sync_joining_player_disp_and_inventory(
|
||||
out_cmd.inventory.items[z].data.decode_for_version(GameVersion::XB);
|
||||
out_cmd.inventory.items[z].data.encode_for_version(GameVersion::GC, s->item_parameter_table_for_version(GameVersion::GC));
|
||||
}
|
||||
send_command(target, command, flag, &out_cmd, sizeof(G_SyncPlayerDispAndInventory_DC_PC_GC_6x70));
|
||||
send_or_enqueue_joining_player_command(target, command, flag, out_cmd);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_forward_check_game_loading(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
auto l = c->require_lobby();
|
||||
if (!l->is_game() || !l->any_client_loading()) {
|
||||
return;
|
||||
}
|
||||
forward_subcommand(c, command, flag, data, size);
|
||||
}
|
||||
|
||||
static void on_forward_check_size_client(shared_ptr<Client> c, uint8_t command, uint8_t flag, const void* data, size_t size) {
|
||||
const auto& cmd = check_size_t<G_ClientIDHeader>(data, size, 0xFFFF);
|
||||
if (cmd.client_id != c->lobby_client_id) {
|
||||
|
||||
Reference in New Issue
Block a user