#ifdef NETWORK #include "WebSocketClient.h" #include #include namespace ZL { void WebSocketClient::Connect(const std::string& host, uint16_t port) { try { boost::asio::ip::tcp::resolver resolver(ioc_); auto const results = resolver.resolve(host, std::to_string(port)); ws_ = std::make_unique>(ioc_); // Выполняем синхронный коннект и handshake для простоты старта boost::beast::get_lowest_layer(*ws_).connect(results); ws_->handshake(host, "/"); connected = true; // Запускаем асинхронное чтение в пуле потоков TaskManager startAsyncRead(); } catch (std::exception& e) { std::cerr << "Network Error: " << e.what() << std::endl; } } void WebSocketClient::startAsyncRead() { ws_->async_read(buffer_, [this](boost::beast::error_code ec, std::size_t bytes) { if (!ec) { std::string msg = boost::beast::buffers_to_string(buffer_.data()); buffer_.consume(bytes); processIncomingMessage(msg); startAsyncRead(); } else { connected = false; } }); } void WebSocketClient::processIncomingMessage(const std::string& msg) { // Lock-free push: producer (I/O thread) pushes to its buffer readProducerBuf_.load(std::memory_order_relaxed)->push_back(msg); } void WebSocketClient::Poll() { // Lock-free drain: swap consumer buffer with producer if ours is empty, then process all MessageBuf* c = readConsumerBuf_.load(std::memory_order_acquire); if (c->empty()) { MessageBuf* p = readProducerBuf_.exchange(c, std::memory_order_acq_rel); readConsumerBuf_.store(p, std::memory_order_release); c = p; } for (std::string& msg : *c) { HandlePollMessage(msg); } c->clear(); } void WebSocketClient::Send(const std::string& message) { if (!connected) return; std::string finalMessage = SignMessage(message); auto ss = std::make_shared(std::move(finalMessage)); // Lock-free push to write queue writeProducerBuf_.load(std::memory_order_relaxed)->push_back(ss); // Start write chain if not already writing bool expected = false; if (isWriting_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { doWrite(); } } void WebSocketClient::doWrite() { // Lock-free: take next message from consumer buffer; swap buffers if drained WriteBuf* c = writeConsumerBuf_.load(std::memory_order_acquire); if (currentWriteBuf_ == nullptr || currentWriteIndex_ >= currentWriteBuf_->size()) { if (currentWriteBuf_) { currentWriteBuf_->clear(); } currentWriteBuf_ = c; if (currentWriteBuf_->empty()) { WriteBuf* p = writeProducerBuf_.exchange(currentWriteBuf_, std::memory_order_acq_rel); writeConsumerBuf_.store(p, std::memory_order_release); currentWriteBuf_ = p; } currentWriteIndex_ = 0; } if (currentWriteIndex_ >= currentWriteBuf_->size()) { isWriting_.store(false, std::memory_order_release); return; } std::shared_ptr message = (*currentWriteBuf_)[currentWriteIndex_++]; ws_->async_write( boost::asio::buffer(*message), [this, message](boost::beast::error_code ec, std::size_t) { if (ec) { connected = false; isWriting_.store(false, std::memory_order_release); return; } doWrite(); } ); } } #endif