add websocket endpoint for rare drop stream

This commit is contained in:
Martin Michelsen
2024-06-21 10:57:12 -07:00
parent c0ea976fdc
commit 0ae02b0191
5 changed files with 307 additions and 12 deletions
+240 -2
View File
@@ -132,8 +132,7 @@ unordered_multimap<string, string> HTTPServer::parse_url_params(const string& qu
}
value.resize(write_offset);
params.emplace(piecewise_construct, forward_as_tuple(it, 0, first_equals),
forward_as_tuple(value));
params.emplace(piecewise_construct, forward_as_tuple(it, 0, first_equals), forward_as_tuple(value));
} else {
params.emplace(it, "");
}
@@ -204,6 +203,233 @@ void HTTPServer::wait_for_stop() {
this->th.join();
}
HTTPServer::WebsocketClient::WebsocketClient(struct evhttp_connection* conn)
: conn(conn),
bev(evhttp_connection_get_bufferevent(this->conn)),
pending_opcode(0xFF),
last_communication_time(now()) {}
HTTPServer::WebsocketClient::~WebsocketClient() {
evhttp_connection_free(this->conn);
}
void HTTPServer::WebsocketClient::reset_pending_frame() {
this->pending_opcode = 0xFF;
this->pending_data.clear();
}
shared_ptr<HTTPServer::WebsocketClient> HTTPServer::enable_websockets(struct evhttp_request* req) {
if (evhttp_request_get_command(req) != EVHTTP_REQ_GET) {
return nullptr;
}
struct evkeyvalq* in_headers = evhttp_request_get_input_headers(req);
const char* connection_header = evhttp_find_header(in_headers, "Connection");
if (!connection_header || strcasecmp(connection_header, "upgrade")) {
return nullptr;
}
const char* upgrade_header = evhttp_find_header(in_headers, "Upgrade");
if (!upgrade_header || strcasecmp(upgrade_header, "websocket")) {
return nullptr;
}
const char* sec_websocket_key_header = evhttp_find_header(in_headers, "Sec-WebSocket-Key");
if (!sec_websocket_key_header) {
return nullptr;
}
// Note: it's important that we make a copy of this header's value since
// we're about to free the original
string sec_websocket_key = sec_websocket_key_header;
string sec_websocket_accept_data = sec_websocket_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
string sec_websocket_accept = base64_encode(sha1(sec_websocket_accept_data));
// Hijack the bufferevent since it's no longer handling HTTP at all
struct evhttp_connection* conn = evhttp_request_get_connection(req);
struct bufferevent* bev = evhttp_connection_get_bufferevent(conn);
bufferevent_setcb(bev, &this->dispatch_on_websocket_read, NULL, &this->dispatch_on_websocket_error, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);
// Send the HTTP reply, which enables websockets
struct evbuffer* out_buf = bufferevent_get_output(bev);
evbuffer_add_printf(out_buf, "HTTP/1.1 101 Switching Protocols\r\n\
Upgrade: websocket\r\n\
Connection: upgrade\r\n\
Sec-WebSocket-Accept: %s\r\n\
\r\n",
sec_websocket_accept.c_str());
return this->bev_to_websocket_client.emplace(bev, new WebsocketClient(conn)).first->second;
}
void HTTPServer::dispatch_on_websocket_read(struct bufferevent* bev, void* ctx) {
reinterpret_cast<HTTPServer*>(ctx)->on_websocket_read(bev);
}
void HTTPServer::dispatch_on_websocket_error(struct bufferevent* bev, short events, void* ctx) {
reinterpret_cast<HTTPServer*>(ctx)->on_websocket_error(bev, events);
}
void HTTPServer::on_websocket_read(struct bufferevent* bev) {
struct evbuffer* in_buf = bufferevent_get_input(bev);
for (;;) {
// We need at most 10 bytes to determine if there's a valid frame, or as
// little as 2
string header_data(10, '\0');
ssize_t bytes_read = evbuffer_copyout(in_buf, const_cast<char*>(header_data.data()), header_data.size());
if (bytes_read < 2) {
break; // Full header not yet available
}
// Get the payload size
bool has_mask = header_data[1] & 0x80;
size_t header_size = 2;
size_t payload_size = header_data[1] & 0x7F;
if (payload_size == 0x7F) {
if (bytes_read < 10) {
break; // Full 64-bit header not yet available
}
payload_size = bswap64(*reinterpret_cast<const uint64_t*>(&header_data[2]));
header_size = 10;
} else if (payload_size == 0x7E) {
if (bytes_read < 4) {
break; // Full 16-bit size header not yet available
}
payload_size = bswap16(*reinterpret_cast<const uint16_t*>(&header_data[2]));
header_size = 4;
}
if (evbuffer_get_length(in_buf) < header_size + payload_size) {
break; // Full message not yet available
}
// Full message is available; skip the header bytes (we already read them)
// and read the masking key if needed
evbuffer_drain(in_buf, header_size);
uint8_t mask_key[4];
if (has_mask) {
evbuffer_remove(in_buf, mask_key, 4);
}
shared_ptr<WebsocketClient> c = this->bev_to_websocket_client.at(bev);
c->last_communication_time = now();
// Read and unmask message data
string payload(payload_size, '\0');
evbuffer_remove(in_buf, const_cast<char*>(payload.data()), payload_size);
if (has_mask) {
for (size_t x = 0; x < payload_size; x++) {
payload[x] ^= mask_key[x & 3];
}
}
// If the current message is a control message, respond appropriately
// (these can be sent in the middle of fragmented messages)
uint8_t opcode = header_data[0] & 0x0F;
if (opcode & 0x08) {
if (opcode == 0x0A) {
// Ping response; ignore it
} else if (opcode == 0x08) {
// Close message
this->send_websocket_message(bev, payload, 0x08);
this->disconnect_websocket_client(bev);
} else if (opcode == 0x09) {
// Ping message
this->send_websocket_message(bev, payload, 0x0A);
} else {
// Unknown control message type
this->disconnect_websocket_client(bev);
}
break;
}
// If there's an existing pending message, the current message's opcode
// should be zero; if there's no pending message, it must not be zero
if ((c->pending_opcode != 0xFF) == (opcode != 0)) {
this->disconnect_websocket_client(bev);
break;
}
// At this point, we have read a full message; we must not break out of
// this loop in case there are further messages available.
// Save the message opcode, if present, and append the frame data
if (opcode) {
c->pending_opcode = opcode;
}
c->pending_data += payload;
// If the FIN bit is set, then the frame is complete - append the payload
// to any pending payloads and call the message handler. If the FIN bit
// isn't set, we need to receive at least one continuation frame to
// complete the message.
if (header_data[0] & 0x80) {
this->handle_websocket_message(c, c->pending_opcode, c->pending_data);
c->reset_pending_frame();
}
}
}
void HTTPServer::on_websocket_error(struct bufferevent* bev, short events) {
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
this->disconnect_websocket_client(bev);
}
}
void HTTPServer::disconnect_websocket_client(struct bufferevent* bev) {
auto it = this->bev_to_websocket_client.find(bev);
this->handle_websocket_disconnect(it->second);
this->bev_to_websocket_client.erase(it);
}
void HTTPServer::send_websocket_message(struct bufferevent* bev,
const string& message, uint8_t opcode) {
string header;
header.push_back(0x80 | (opcode & 0x0F));
if (message.size() > 65535) {
header.push_back(0x7F);
header.resize(10);
*reinterpret_cast<uint64_t*>(const_cast<char*>(header.data() + 2)) = bswap64(message.size());
} else if (message.size() > 0x7D) {
header.push_back(0x7E);
header.resize(4);
*reinterpret_cast<uint16_t*>(const_cast<char*>(header.data() + 2)) = bswap16(message.size());
} else {
header.push_back(message.size());
}
struct evbuffer* out_buf = bufferevent_get_output(bev);
evbuffer_add(out_buf, header.data(), header.size());
evbuffer_add(out_buf, message.data(), message.size());
}
void HTTPServer::send_websocket_message(shared_ptr<WebsocketClient> c, const string& message, uint8_t opcode) {
this->send_websocket_message(c->bev, message, opcode);
}
void HTTPServer::handle_websocket_message(shared_ptr<WebsocketClient>, uint8_t, const string&) {
// Currently we just ignore any messages from the client
}
void HTTPServer::handle_websocket_disconnect(shared_ptr<WebsocketClient> c) {
this->rare_drop_subscribers.erase(c);
}
void HTTPServer::send_rare_drop_notification(shared_ptr<const JSON> message) {
forward_to_event_thread(this->base, [this, message]() -> void {
if (this->rare_drop_subscribers.empty()) {
return;
}
string serialized = message->serialize();
for (const auto& c : this->rare_drop_subscribers) {
this->send_websocket_message(c, serialized);
}
});
}
void HTTPServer::dispatch_handle_request(struct evhttp_request* req, void* ctx) {
reinterpret_cast<HTTPServer*>(ctx)->handle_request(req);
}
@@ -960,11 +1186,23 @@ void HTTPServer::handle_request(struct evhttp_request* req) {
"/y/proxy-clients",
"/y/lobbies",
"/y/server",
"/y/rare-drops/stream",
"/y/summary",
"/y/all",
});
ret = make_shared<JSON>(JSON::dict({{"endpoints", std::move(endpoints_json)}}));
} else if (uri == "/y/rare-drops/stream") {
auto c = this->enable_websockets(req);
if (!c) {
throw http_error(400, "this path requires a websocket connection");
} else {
this->rare_drop_subscribers.emplace(c);
auto version_message = JSON::dict({{"ServerType", "newserv"}});
this->send_websocket_message(c, version_message.serialize());
return;
}
} else if (uri == "/y/data/ep3-cards") {
ret = make_shared<JSON>(this->generate_ep3_cards_json(false));
} else if (uri == "/y/data/ep3-cards-trial") {
+38
View File
@@ -28,6 +28,8 @@ public:
void schedule_stop();
void wait_for_stop();
void send_rare_drop_notification(std::shared_ptr<const JSON> message);
protected:
class http_error : public std::runtime_error {
public:
@@ -35,11 +37,47 @@ protected:
int code;
};
struct WebsocketClient {
struct evhttp_connection* conn;
struct bufferevent* bev;
uint8_t pending_opcode;
std::string pending_data;
uint64_t last_communication_time;
void* context;
WebsocketClient(struct evhttp_connection* conn);
~WebsocketClient();
void reset_pending_frame();
};
std::shared_ptr<ServerState> state;
std::shared_ptr<struct event_base> base;
std::shared_ptr<struct evhttp> http;
std::thread th;
std::unordered_set<std::shared_ptr<WebsocketClient>> rare_drop_subscribers;
std::unordered_map<struct bufferevent*, std::shared_ptr<WebsocketClient>> bev_to_websocket_client;
std::shared_ptr<WebsocketClient> enable_websockets(struct evhttp_request* req);
static void dispatch_on_websocket_read(struct bufferevent* bev, void* ctx);
static void dispatch_on_websocket_error(struct bufferevent* bev, short events, void* ctx);
void on_websocket_read(struct bufferevent* bev);
void on_websocket_error(struct bufferevent* bev, short events);
void disconnect_websocket_client(struct bufferevent* bev);
void send_websocket_message(struct bufferevent* bev, const std::string& message, uint8_t opcode = 0x01);
void send_websocket_message(std::shared_ptr<WebsocketClient> c, const std::string& message, uint8_t opcode = 0x01);
virtual void handle_websocket_message(std::shared_ptr<WebsocketClient> c, uint8_t opcode, const std::string& message);
virtual void handle_websocket_disconnect(std::shared_ptr<WebsocketClient> c);
void thread_fn();
static void dispatch_handle_request(struct evhttp_request* req, void* ctx);
+6 -7
View File
@@ -2543,7 +2543,6 @@ Action a_run_server_replay_log(
shared_ptr<ServerShell> shell;
shared_ptr<ReplaySession> replay_session;
shared_ptr<HTTPServer> http_server;
if (is_replay) {
config_log.info("Starting proxy server");
state->proxy_server = make_shared<ProxyServer>(base, state);
@@ -2653,10 +2652,10 @@ Action a_run_server_replay_log(
if (!state->http_addresses.empty() || !state->http_addresses.empty()) {
config_log.info("Starting HTTP server");
http_server = make_shared<HTTPServer>(state);
state->http_server = make_shared<HTTPServer>(state);
for (const auto& it : state->http_addresses) {
auto netloc = parse_netloc(it);
http_server->listen(netloc.first, netloc.second);
state->http_server->listen(netloc.first, netloc.second);
}
}
}
@@ -2700,8 +2699,8 @@ Action a_run_server_replay_log(
if (state->bb_patch_server) {
state->bb_patch_server->schedule_stop();
}
if (http_server) {
http_server->schedule_stop();
if (state->http_server) {
state->http_server->schedule_stop();
}
if (state->pc_patch_server) {
config_log.info("Waiting for PC_V2 patch server to stop");
@@ -2711,9 +2710,9 @@ Action a_run_server_replay_log(
config_log.info("Waiting for BB_V4 patch server to stop");
state->bb_patch_server->wait_for_stop();
}
if (http_server) {
if (state->http_server) {
config_log.info("Waiting for HTTP server to stop");
http_server->wait_for_stop();
state->http_server->wait_for_stop();
}
state->proxy_server.reset(); // Break reference cycle
});
+21 -3
View File
@@ -10,6 +10,7 @@
#include "Client.hh"
#include "Compression.hh"
#include "HTTPServer.hh"
#include "Items.hh"
#include "Lobby.hh"
#include "Loggers.hh"
@@ -2072,9 +2073,26 @@ static void on_pick_up_item_generic(
if (should_send_game_notif || should_send_global_notif) {
string p_name = p->disp.name.decode();
string desc = s->describe_item(c->version(), fi->data, true);
string message = string_printf("$C6%s$C7 found\n%s", p_name.c_str(), desc.c_str());
string bb_message = string_printf("$C6%s$C7 has found %s", p_name.c_str(), desc.c_str());
string desc_ingame = s->describe_item(c->version(), fi->data, true);
string desc_http = s->describe_item(c->version(), fi->data, false);
if (s->http_server) {
auto message = make_shared<JSON>(JSON::dict({
{"PlayerAccountID", c->login->account->account_id},
{"PlayerName", p_name},
{"PlayerVersion", name_for_enum(c->version())},
{"GameName", l->name},
{"GameDropMode", name_for_enum(l->drop_mode)},
{"ItemData", fi->data.hex()},
{"ItemDescription", desc_http},
{"NotifyGame", should_send_game_notif},
{"NotifyServer", should_send_global_notif},
}));
s->http_server->send_rare_drop_notification(message);
}
string message = string_printf("$C6%s$C7 found\n%s", p_name.c_str(), desc_ingame.c_str());
string bb_message = string_printf("$C6%s$C7 has found %s", p_name.c_str(), desc_ingame.c_str());
if (should_send_global_notif) {
for (auto& it : s->channel_to_client) {
if (it.second->login &&
+2
View File
@@ -36,6 +36,7 @@
class ProxyServer;
class Server;
class IPStackSimulator;
class HTTPServer;
struct PortConfiguration {
std::string name;
@@ -270,6 +271,7 @@ struct ServerState : public std::enable_shared_from_this<ServerState> {
std::shared_ptr<Server> game_server;
std::shared_ptr<PatchServer> pc_patch_server;
std::shared_ptr<PatchServer> bb_patch_server;
std::shared_ptr<HTTPServer> http_server;
explicit ServerState(const std::string& config_filename = "");
ServerState(std::shared_ptr<struct event_base> base, const std::string& config_filename, bool is_replay);