118 lines
3.3 KiB
C++
118 lines
3.3 KiB
C++
#ifdef NETWORK
|
|
|
|
#include "WebSocketClient.h"
|
|
#include <iostream>
|
|
#include <SDL2/SDL.h>
|
|
|
|
namespace ZL {
|
|
|
|
void WebSocketClient::Connect(const std::string& host, uint16_t port) {
|
|
try {
|
|
boost::asio::ip::tcp::resolver resolver(ioc_);
|
|
auto const results = resolver.resolve(host, std::to_string(port));
|
|
|
|
ws_ = std::make_unique<boost::beast::websocket::stream<boost::beast::tcp_stream>>(ioc_);
|
|
|
|
// Выполняем синхронный коннект и handshake для простоты старта
|
|
boost::beast::get_lowest_layer(*ws_).connect(results);
|
|
ws_->handshake(host, "/");
|
|
|
|
connected = true;
|
|
|
|
// Запускаем асинхронное чтение в пуле потоков TaskManager
|
|
startAsyncRead();
|
|
|
|
}
|
|
catch (std::exception& e) {
|
|
std::cerr << "Network Error: " << e.what() << std::endl;
|
|
}
|
|
}
|
|
|
|
void WebSocketClient::startAsyncRead() {
|
|
ws_->async_read(buffer_, [this](boost::beast::error_code ec, std::size_t bytes) {
|
|
if (!ec) {
|
|
std::string msg = boost::beast::buffers_to_string(buffer_.data());
|
|
buffer_.consume(bytes);
|
|
processIncomingMessage(msg);
|
|
startAsyncRead();
|
|
}
|
|
else {
|
|
connected = false;
|
|
}
|
|
});
|
|
}
|
|
|
|
void WebSocketClient::processIncomingMessage(const std::string& msg) {
|
|
// Lock-free push: producer (I/O thread) pushes to its buffer
|
|
readProducerBuf_.load(std::memory_order_relaxed)->push_back(msg);
|
|
}
|
|
|
|
void WebSocketClient::Poll() {
|
|
// Lock-free drain: swap consumer buffer with producer if ours is empty, then process all
|
|
MessageBuf* c = readConsumerBuf_.load(std::memory_order_acquire);
|
|
if (c->empty()) {
|
|
MessageBuf* p = readProducerBuf_.exchange(c, std::memory_order_acq_rel);
|
|
readConsumerBuf_.store(p, std::memory_order_release);
|
|
c = p;
|
|
}
|
|
for (std::string& msg : *c) {
|
|
HandlePollMessage(msg);
|
|
}
|
|
c->clear();
|
|
}
|
|
|
|
|
|
|
|
void WebSocketClient::Send(const std::string& message) {
|
|
if (!connected) return;
|
|
|
|
std::string finalMessage = SignMessage(message);
|
|
auto ss = std::make_shared<std::string>(std::move(finalMessage));
|
|
|
|
// Lock-free push to write queue
|
|
writeProducerBuf_.load(std::memory_order_relaxed)->push_back(ss);
|
|
|
|
// Start write chain if not already writing
|
|
bool expected = false;
|
|
if (isWriting_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
|
|
doWrite();
|
|
}
|
|
}
|
|
|
|
void WebSocketClient::doWrite() {
|
|
// Lock-free: take next message from consumer buffer; swap buffers if drained
|
|
WriteBuf* c = writeConsumerBuf_.load(std::memory_order_acquire);
|
|
if (currentWriteBuf_ == nullptr || currentWriteIndex_ >= currentWriteBuf_->size()) {
|
|
if (currentWriteBuf_) {
|
|
currentWriteBuf_->clear();
|
|
}
|
|
currentWriteBuf_ = c;
|
|
if (currentWriteBuf_->empty()) {
|
|
WriteBuf* p = writeProducerBuf_.exchange(currentWriteBuf_, std::memory_order_acq_rel);
|
|
writeConsumerBuf_.store(p, std::memory_order_release);
|
|
currentWriteBuf_ = p;
|
|
}
|
|
currentWriteIndex_ = 0;
|
|
}
|
|
if (currentWriteIndex_ >= currentWriteBuf_->size()) {
|
|
isWriting_.store(false, std::memory_order_release);
|
|
return;
|
|
}
|
|
|
|
std::shared_ptr<std::string> message = (*currentWriteBuf_)[currentWriteIndex_++];
|
|
|
|
ws_->async_write(
|
|
boost::asio::buffer(*message),
|
|
[this, message](boost::beast::error_code ec, std::size_t) {
|
|
if (ec) {
|
|
connected = false;
|
|
isWriting_.store(false, std::memory_order_release);
|
|
return;
|
|
}
|
|
doWrite();
|
|
}
|
|
);
|
|
}
|
|
}
|
|
|
|
#endif |