From 0ae02b019178d61892b1d6cf79f18e693bb277c6 Mon Sep 17 00:00:00 2001 From: Martin Michelsen Date: Fri, 21 Jun 2024 10:57:12 -0700 Subject: [PATCH] add websocket endpoint for rare drop stream --- src/HTTPServer.cc | 242 +++++++++++++++++++++++++++++++++++++- src/HTTPServer.hh | 38 ++++++ src/Main.cc | 13 +- src/ReceiveSubcommands.cc | 24 +++- src/ServerState.hh | 2 + 5 files changed, 307 insertions(+), 12 deletions(-) diff --git a/src/HTTPServer.cc b/src/HTTPServer.cc index 6baf7d68..dfe9ad1a 100644 --- a/src/HTTPServer.cc +++ b/src/HTTPServer.cc @@ -132,8 +132,7 @@ unordered_multimap HTTPServer::parse_url_params(const string& qu } value.resize(write_offset); - params.emplace(piecewise_construct, forward_as_tuple(it, 0, first_equals), - forward_as_tuple(value)); + params.emplace(piecewise_construct, forward_as_tuple(it, 0, first_equals), forward_as_tuple(value)); } else { params.emplace(it, ""); } @@ -204,6 +203,233 @@ void HTTPServer::wait_for_stop() { this->th.join(); } +HTTPServer::WebsocketClient::WebsocketClient(struct evhttp_connection* conn) + : conn(conn), + bev(evhttp_connection_get_bufferevent(this->conn)), + pending_opcode(0xFF), + last_communication_time(now()) {} + +HTTPServer::WebsocketClient::~WebsocketClient() { + evhttp_connection_free(this->conn); +} + +void HTTPServer::WebsocketClient::reset_pending_frame() { + this->pending_opcode = 0xFF; + this->pending_data.clear(); +} + +shared_ptr HTTPServer::enable_websockets(struct evhttp_request* req) { + if (evhttp_request_get_command(req) != EVHTTP_REQ_GET) { + return nullptr; + } + + struct evkeyvalq* in_headers = evhttp_request_get_input_headers(req); + const char* connection_header = evhttp_find_header(in_headers, "Connection"); + if (!connection_header || strcasecmp(connection_header, "upgrade")) { + return nullptr; + } + const char* upgrade_header = evhttp_find_header(in_headers, "Upgrade"); + if (!upgrade_header || strcasecmp(upgrade_header, "websocket")) { + return nullptr; + } + const char* sec_websocket_key_header = evhttp_find_header(in_headers, "Sec-WebSocket-Key"); + if (!sec_websocket_key_header) { + return nullptr; + } + + // Note: it's important that we make a copy of this header's value since + // we're about to free the original + string sec_websocket_key = sec_websocket_key_header; + string sec_websocket_accept_data = sec_websocket_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + string sec_websocket_accept = base64_encode(sha1(sec_websocket_accept_data)); + + // Hijack the bufferevent since it's no longer handling HTTP at all + struct evhttp_connection* conn = evhttp_request_get_connection(req); + struct bufferevent* bev = evhttp_connection_get_bufferevent(conn); + bufferevent_setcb(bev, &this->dispatch_on_websocket_read, NULL, &this->dispatch_on_websocket_error, this); + bufferevent_enable(bev, EV_READ | EV_WRITE); + + // Send the HTTP reply, which enables websockets + struct evbuffer* out_buf = bufferevent_get_output(bev); + evbuffer_add_printf(out_buf, "HTTP/1.1 101 Switching Protocols\r\n\ +Upgrade: websocket\r\n\ +Connection: upgrade\r\n\ +Sec-WebSocket-Accept: %s\r\n\ +\r\n", + sec_websocket_accept.c_str()); + + return this->bev_to_websocket_client.emplace(bev, new WebsocketClient(conn)).first->second; +} + +void HTTPServer::dispatch_on_websocket_read(struct bufferevent* bev, void* ctx) { + reinterpret_cast(ctx)->on_websocket_read(bev); +} + +void HTTPServer::dispatch_on_websocket_error(struct bufferevent* bev, short events, void* ctx) { + reinterpret_cast(ctx)->on_websocket_error(bev, events); +} + +void HTTPServer::on_websocket_read(struct bufferevent* bev) { + struct evbuffer* in_buf = bufferevent_get_input(bev); + + for (;;) { + // We need at most 10 bytes to determine if there's a valid frame, or as + // little as 2 + string header_data(10, '\0'); + ssize_t bytes_read = evbuffer_copyout(in_buf, const_cast(header_data.data()), header_data.size()); + + if (bytes_read < 2) { + break; // Full header not yet available + } + + // Get the payload size + bool has_mask = header_data[1] & 0x80; + size_t header_size = 2; + size_t payload_size = header_data[1] & 0x7F; + if (payload_size == 0x7F) { + if (bytes_read < 10) { + break; // Full 64-bit header not yet available + } + payload_size = bswap64(*reinterpret_cast(&header_data[2])); + header_size = 10; + } else if (payload_size == 0x7E) { + if (bytes_read < 4) { + break; // Full 16-bit size header not yet available + } + payload_size = bswap16(*reinterpret_cast(&header_data[2])); + header_size = 4; + } + if (evbuffer_get_length(in_buf) < header_size + payload_size) { + break; // Full message not yet available + } + + // Full message is available; skip the header bytes (we already read them) + // and read the masking key if needed + evbuffer_drain(in_buf, header_size); + uint8_t mask_key[4]; + if (has_mask) { + evbuffer_remove(in_buf, mask_key, 4); + } + + shared_ptr c = this->bev_to_websocket_client.at(bev); + c->last_communication_time = now(); + + // Read and unmask message data + string payload(payload_size, '\0'); + evbuffer_remove(in_buf, const_cast(payload.data()), payload_size); + if (has_mask) { + for (size_t x = 0; x < payload_size; x++) { + payload[x] ^= mask_key[x & 3]; + } + } + + // If the current message is a control message, respond appropriately + // (these can be sent in the middle of fragmented messages) + uint8_t opcode = header_data[0] & 0x0F; + if (opcode & 0x08) { + if (opcode == 0x0A) { + // Ping response; ignore it + + } else if (opcode == 0x08) { + // Close message + this->send_websocket_message(bev, payload, 0x08); + this->disconnect_websocket_client(bev); + + } else if (opcode == 0x09) { + // Ping message + this->send_websocket_message(bev, payload, 0x0A); + + } else { + // Unknown control message type + this->disconnect_websocket_client(bev); + } + break; + } + + // If there's an existing pending message, the current message's opcode + // should be zero; if there's no pending message, it must not be zero + if ((c->pending_opcode != 0xFF) == (opcode != 0)) { + this->disconnect_websocket_client(bev); + break; + } + + // At this point, we have read a full message; we must not break out of + // this loop in case there are further messages available. + + // Save the message opcode, if present, and append the frame data + if (opcode) { + c->pending_opcode = opcode; + } + c->pending_data += payload; + + // If the FIN bit is set, then the frame is complete - append the payload + // to any pending payloads and call the message handler. If the FIN bit + // isn't set, we need to receive at least one continuation frame to + // complete the message. + if (header_data[0] & 0x80) { + this->handle_websocket_message(c, c->pending_opcode, c->pending_data); + c->reset_pending_frame(); + } + } +} + +void HTTPServer::on_websocket_error(struct bufferevent* bev, short events) { + if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) { + this->disconnect_websocket_client(bev); + } +} + +void HTTPServer::disconnect_websocket_client(struct bufferevent* bev) { + auto it = this->bev_to_websocket_client.find(bev); + this->handle_websocket_disconnect(it->second); + this->bev_to_websocket_client.erase(it); +} + +void HTTPServer::send_websocket_message(struct bufferevent* bev, + const string& message, uint8_t opcode) { + string header; + header.push_back(0x80 | (opcode & 0x0F)); + if (message.size() > 65535) { + header.push_back(0x7F); + header.resize(10); + *reinterpret_cast(const_cast(header.data() + 2)) = bswap64(message.size()); + } else if (message.size() > 0x7D) { + header.push_back(0x7E); + header.resize(4); + *reinterpret_cast(const_cast(header.data() + 2)) = bswap16(message.size()); + } else { + header.push_back(message.size()); + } + + struct evbuffer* out_buf = bufferevent_get_output(bev); + evbuffer_add(out_buf, header.data(), header.size()); + evbuffer_add(out_buf, message.data(), message.size()); +} + +void HTTPServer::send_websocket_message(shared_ptr c, const string& message, uint8_t opcode) { + this->send_websocket_message(c->bev, message, opcode); +} + +void HTTPServer::handle_websocket_message(shared_ptr, uint8_t, const string&) { + // Currently we just ignore any messages from the client +} + +void HTTPServer::handle_websocket_disconnect(shared_ptr c) { + this->rare_drop_subscribers.erase(c); +} + +void HTTPServer::send_rare_drop_notification(shared_ptr message) { + forward_to_event_thread(this->base, [this, message]() -> void { + if (this->rare_drop_subscribers.empty()) { + return; + } + string serialized = message->serialize(); + for (const auto& c : this->rare_drop_subscribers) { + this->send_websocket_message(c, serialized); + } + }); +} + void HTTPServer::dispatch_handle_request(struct evhttp_request* req, void* ctx) { reinterpret_cast(ctx)->handle_request(req); } @@ -960,11 +1186,23 @@ void HTTPServer::handle_request(struct evhttp_request* req) { "/y/proxy-clients", "/y/lobbies", "/y/server", + "/y/rare-drops/stream", "/y/summary", "/y/all", }); ret = make_shared(JSON::dict({{"endpoints", std::move(endpoints_json)}})); + } else if (uri == "/y/rare-drops/stream") { + auto c = this->enable_websockets(req); + if (!c) { + throw http_error(400, "this path requires a websocket connection"); + } else { + this->rare_drop_subscribers.emplace(c); + auto version_message = JSON::dict({{"ServerType", "newserv"}}); + this->send_websocket_message(c, version_message.serialize()); + return; + } + } else if (uri == "/y/data/ep3-cards") { ret = make_shared(this->generate_ep3_cards_json(false)); } else if (uri == "/y/data/ep3-cards-trial") { diff --git a/src/HTTPServer.hh b/src/HTTPServer.hh index 164e623e..5e2fa4f3 100644 --- a/src/HTTPServer.hh +++ b/src/HTTPServer.hh @@ -28,6 +28,8 @@ public: void schedule_stop(); void wait_for_stop(); + void send_rare_drop_notification(std::shared_ptr message); + protected: class http_error : public std::runtime_error { public: @@ -35,11 +37,47 @@ protected: int code; }; + struct WebsocketClient { + struct evhttp_connection* conn; + struct bufferevent* bev; + + uint8_t pending_opcode; + std::string pending_data; + + uint64_t last_communication_time; + + void* context; + + WebsocketClient(struct evhttp_connection* conn); + ~WebsocketClient(); + + void reset_pending_frame(); + }; + std::shared_ptr state; std::shared_ptr base; std::shared_ptr http; std::thread th; + std::unordered_set> rare_drop_subscribers; + + std::unordered_map> bev_to_websocket_client; + + std::shared_ptr enable_websockets(struct evhttp_request* req); + + static void dispatch_on_websocket_read(struct bufferevent* bev, void* ctx); + static void dispatch_on_websocket_error(struct bufferevent* bev, short events, void* ctx); + + void on_websocket_read(struct bufferevent* bev); + void on_websocket_error(struct bufferevent* bev, short events); + + void disconnect_websocket_client(struct bufferevent* bev); + void send_websocket_message(struct bufferevent* bev, const std::string& message, uint8_t opcode = 0x01); + void send_websocket_message(std::shared_ptr c, const std::string& message, uint8_t opcode = 0x01); + + virtual void handle_websocket_message(std::shared_ptr c, uint8_t opcode, const std::string& message); + virtual void handle_websocket_disconnect(std::shared_ptr c); + void thread_fn(); static void dispatch_handle_request(struct evhttp_request* req, void* ctx); diff --git a/src/Main.cc b/src/Main.cc index ad2e1d1c..99edf215 100644 --- a/src/Main.cc +++ b/src/Main.cc @@ -2543,7 +2543,6 @@ Action a_run_server_replay_log( shared_ptr shell; shared_ptr replay_session; - shared_ptr http_server; if (is_replay) { config_log.info("Starting proxy server"); state->proxy_server = make_shared(base, state); @@ -2653,10 +2652,10 @@ Action a_run_server_replay_log( if (!state->http_addresses.empty() || !state->http_addresses.empty()) { config_log.info("Starting HTTP server"); - http_server = make_shared(state); + state->http_server = make_shared(state); for (const auto& it : state->http_addresses) { auto netloc = parse_netloc(it); - http_server->listen(netloc.first, netloc.second); + state->http_server->listen(netloc.first, netloc.second); } } } @@ -2700,8 +2699,8 @@ Action a_run_server_replay_log( if (state->bb_patch_server) { state->bb_patch_server->schedule_stop(); } - if (http_server) { - http_server->schedule_stop(); + if (state->http_server) { + state->http_server->schedule_stop(); } if (state->pc_patch_server) { config_log.info("Waiting for PC_V2 patch server to stop"); @@ -2711,9 +2710,9 @@ Action a_run_server_replay_log( config_log.info("Waiting for BB_V4 patch server to stop"); state->bb_patch_server->wait_for_stop(); } - if (http_server) { + if (state->http_server) { config_log.info("Waiting for HTTP server to stop"); - http_server->wait_for_stop(); + state->http_server->wait_for_stop(); } state->proxy_server.reset(); // Break reference cycle }); diff --git a/src/ReceiveSubcommands.cc b/src/ReceiveSubcommands.cc index 3eeef678..ce1eb281 100644 --- a/src/ReceiveSubcommands.cc +++ b/src/ReceiveSubcommands.cc @@ -10,6 +10,7 @@ #include "Client.hh" #include "Compression.hh" +#include "HTTPServer.hh" #include "Items.hh" #include "Lobby.hh" #include "Loggers.hh" @@ -2072,9 +2073,26 @@ static void on_pick_up_item_generic( if (should_send_game_notif || should_send_global_notif) { string p_name = p->disp.name.decode(); - string desc = s->describe_item(c->version(), fi->data, true); - string message = string_printf("$C6%s$C7 found\n%s", p_name.c_str(), desc.c_str()); - string bb_message = string_printf("$C6%s$C7 has found %s", p_name.c_str(), desc.c_str()); + string desc_ingame = s->describe_item(c->version(), fi->data, true); + string desc_http = s->describe_item(c->version(), fi->data, false); + + if (s->http_server) { + auto message = make_shared(JSON::dict({ + {"PlayerAccountID", c->login->account->account_id}, + {"PlayerName", p_name}, + {"PlayerVersion", name_for_enum(c->version())}, + {"GameName", l->name}, + {"GameDropMode", name_for_enum(l->drop_mode)}, + {"ItemData", fi->data.hex()}, + {"ItemDescription", desc_http}, + {"NotifyGame", should_send_game_notif}, + {"NotifyServer", should_send_global_notif}, + })); + s->http_server->send_rare_drop_notification(message); + } + + string message = string_printf("$C6%s$C7 found\n%s", p_name.c_str(), desc_ingame.c_str()); + string bb_message = string_printf("$C6%s$C7 has found %s", p_name.c_str(), desc_ingame.c_str()); if (should_send_global_notif) { for (auto& it : s->channel_to_client) { if (it.second->login && diff --git a/src/ServerState.hh b/src/ServerState.hh index 990ae226..19e8b12c 100644 --- a/src/ServerState.hh +++ b/src/ServerState.hh @@ -36,6 +36,7 @@ class ProxyServer; class Server; class IPStackSimulator; +class HTTPServer; struct PortConfiguration { std::string name; @@ -270,6 +271,7 @@ struct ServerState : public std::enable_shared_from_this { std::shared_ptr game_server; std::shared_ptr pc_patch_server; std::shared_ptr bb_patch_server; + std::shared_ptr http_server; explicit ServerState(const std::string& config_filename = ""); ServerState(std::shared_ptr base, const std::string& config_filename, bool is_replay);