diff --git a/src/network/WebSocketClient.cpp b/src/network/WebSocketClient.cpp index b661890..bdf0232 100644 --- a/src/network/WebSocketClient.cpp +++ b/src/network/WebSocketClient.cpp @@ -1,4 +1,4 @@ -#ifdef NETWORK +#ifdef NETWORK #include "WebSocketClient.h" #include @@ -43,34 +43,22 @@ namespace ZL { } void WebSocketClient::processIncomingMessage(const std::string& msg) { - // Логика парсинга... - /*if (msg.rfind("ID:", 0) == 0) { - clientId = std::stoi(msg.substr(3)); - }*/ - - // Безопасно кладем в очередь для главного потока - std::lock_guard lock(queueMutex); - messageQueue.push(msg); + // Lock-free push: producer (I/O thread) pushes to its buffer + readProducerBuf_.load(std::memory_order_relaxed)->push_back(msg); } void WebSocketClient::Poll() { - std::lock_guard lock(queueMutex); - - while (!messageQueue.empty()) { - /* - auto nowTime = std::chrono::system_clock::now(); - - //Apply server delay: - nowTime -= std::chrono::milliseconds(CLIENT_DELAY); - - auto now_ms = std::chrono::duration_cast( - nowTime.time_since_epoch() - ).count();*/ - std::string msg = messageQueue.front(); - messageQueue.pop(); - + // Lock-free drain: swap consumer buffer with producer if ours is empty, then process all + MessageBuf* c = readConsumerBuf_.load(std::memory_order_acquire); + if (c->empty()) { + MessageBuf* p = readProducerBuf_.exchange(c, std::memory_order_acq_rel); + readConsumerBuf_.store(p, std::memory_order_release); + c = p; + } + for (std::string& msg : *c) { HandlePollMessage(msg); } + c->clear(); } @@ -79,54 +67,49 @@ namespace ZL { if (!connected) return; std::string finalMessage = SignMessage(message); - /* -#ifdef ENABLE_NETWORK_CHECKSUM - // Вычисляем хеш. Для примера используем std::hash, - // но в продакшене лучше взять быструю реализацию типа MurmurHash3. - size_t hashValue = std::hash{}(message + NET_SECRET); + auto ss = std::make_shared(std::move(finalMessage)); - // Преобразуем хеш в hex-строку для передачи - std::stringstream ss_hash; - ss_hash << std::hex << hashValue; + // Lock-free push to write queue + writeProducerBuf_.load(std::memory_order_relaxed)->push_back(ss); - // Добавляем хеш в конец сообщения через разделитель - // Например: "UPD:12345:pos...#hash:a1b2c3d4" - finalMessage += "#hash:" + ss_hash.str(); -#endif -*/ - auto ss = std::make_shared(finalMessage); - - std::lock_guard lock(writeMutex_); - writeQueue_.push(ss); - - // Если сейчас ничего не записывается, инициируем первую запись - if (!isWriting_) { + // Start write chain if not already writing + bool expected = false; + if (isWriting_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { doWrite(); } } void WebSocketClient::doWrite() { - // Эта функция всегда вызывается под мьютексом или из колбэка - if (writeQueue_.empty()) { - isWriting_ = false; + // Lock-free: take next message from consumer buffer; swap buffers if drained + WriteBuf* c = writeConsumerBuf_.load(std::memory_order_acquire); + if (currentWriteBuf_ == nullptr || currentWriteIndex_ >= currentWriteBuf_->size()) { + if (currentWriteBuf_) { + currentWriteBuf_->clear(); + } + currentWriteBuf_ = c; + if (currentWriteBuf_->empty()) { + WriteBuf* p = writeProducerBuf_.exchange(currentWriteBuf_, std::memory_order_acq_rel); + writeConsumerBuf_.store(p, std::memory_order_release); + currentWriteBuf_ = p; + } + currentWriteIndex_ = 0; + } + if (currentWriteIndex_ >= currentWriteBuf_->size()) { + isWriting_.store(false, std::memory_order_release); return; } - isWriting_ = true; - auto message = writeQueue_.front(); + std::shared_ptr message = (*currentWriteBuf_)[currentWriteIndex_++]; - // Захватываем self (shared_from_this), чтобы объект не удалился во время записи ws_->async_write( boost::asio::buffer(*message), [this, message](boost::beast::error_code ec, std::size_t) { if (ec) { connected = false; + isWriting_.store(false, std::memory_order_release); return; } - - std::lock_guard lock(writeMutex_); - writeQueue_.pop(); // Удаляем отправленное сообщение - doWrite(); // Проверяем следующее + doWrite(); } ); } diff --git a/src/network/WebSocketClient.h b/src/network/WebSocketClient.h index 23a03c1..5ccb45c 100644 --- a/src/network/WebSocketClient.h +++ b/src/network/WebSocketClient.h @@ -1,9 +1,11 @@ -#pragma once +#pragma once #ifdef NETWORK #include "WebSocketClientBase.h" -#include +#include +#include +#include #include #include #include @@ -11,21 +13,30 @@ namespace ZL { + // Lock-free SPSC double-buffer: producer pushes to one buffer, consumer swaps and drains the other. + // No mutexes; avoids contention under high message load. class WebSocketClient : public WebSocketClientBase { private: - // Переиспользуем io_context из TaskManager boost::asio::io_context& ioc_; - - // Объекты переехали в члены класса std::unique_ptr> ws_; boost::beast::flat_buffer buffer_; - std::queue messageQueue; - std::mutex queueMutex; // Защита для messageQueue + // Incoming messages: I/O thread pushes, main thread drains in Poll() + using MessageBuf = std::vector; + MessageBuf readBuffer0_; + MessageBuf readBuffer1_; + std::atomic readProducerBuf_; + std::atomic readConsumerBuf_; - std::queue> writeQueue_; - bool isWriting_ = false; - std::mutex writeMutex_; // Отдельный мьютекс для очереди записи + // Outgoing messages: main thread pushes in Send(), doWrite()/completion drains + using WriteBuf = std::vector>; + WriteBuf writeBuffer0_; + WriteBuf writeBuffer1_; + std::atomic writeProducerBuf_; + std::atomic writeConsumerBuf_; + WriteBuf* currentWriteBuf_ = nullptr; + size_t currentWriteIndex_ = 0; + std::atomic isWriting_{ false }; bool connected = false; @@ -34,7 +45,13 @@ namespace ZL { void processIncomingMessage(const std::string& msg); public: - explicit WebSocketClient(boost::asio::io_context& ioc) : ioc_(ioc) {} + explicit WebSocketClient(boost::asio::io_context& ioc) + : ioc_(ioc) + , readProducerBuf_(&readBuffer0_) + , readConsumerBuf_(&readBuffer1_) + , writeProducerBuf_(&writeBuffer0_) + , writeConsumerBuf_(&writeBuffer1_) + {} void Connect(const std::string& host, uint16_t port) override; diff --git a/src/network/WebSocketClientBase.cpp b/src/network/WebSocketClientBase.cpp index de4f03b..48b99f2 100644 --- a/src/network/WebSocketClientBase.cpp +++ b/src/network/WebSocketClientBase.cpp @@ -1,4 +1,4 @@ -#ifdef NETWORK +#ifdef NETWORK #include "WebSocketClientBase.h" #include @@ -70,10 +70,7 @@ namespace ZL { } } } - { - std::lock_guard bLock(boxesMutex); - serverBoxes_ = std::move(parsedBoxes); - } + serverBoxes_ = std::move(parsedBoxes); return; } if (msg.rfind("RESPAWN_ACK:", 0) == 0) { @@ -81,14 +78,8 @@ namespace ZL { if (parts.size() >= 2) { try { int respawnedPlayerId = std::stoi(parts[1]); - { - std::lock_guard rLock(respawnMutex_); - pendingRespawns_.push_back(respawnedPlayerId); - } - { - std::lock_guard pLock(playersMutex); - remotePlayers.erase(respawnedPlayerId); - } + pendingRespawns_.push_back(respawnedPlayerId); + remotePlayers.erase(respawnedPlayerId); std::cout << "Client: Received RESPAWN_ACK for player " << respawnedPlayerId << std::endl; } catch (...) {} @@ -110,10 +101,7 @@ namespace ZL { ); destruction.destroyedBy = std::stoi(parts[6]); - { - std::lock_guard lock(boxDestructionsMutex_); - pendingBoxDestructions_.push_back(destruction); - } + pendingBoxDestructions_.push_back(destruction); std::cout << "Client: Received BOX_DESTROYED for box " << destruction.boxIndex << " destroyed by player " << destruction.destroyedBy << std::endl; @@ -146,7 +134,6 @@ namespace ZL { pi.rotation = q.toRotationMatrix(); pi.velocity = std::stof(parts[10]); - std::lock_guard pl(projMutex_); pendingProjectiles_.push_back(pi); } catch (...) {} @@ -168,7 +155,6 @@ namespace ZL { ); di.killerId = std::stoi(parts[6]); - std::lock_guard dl(deathsMutex_); pendingDeaths_.push_back(di); } catch (...) {} @@ -215,9 +201,7 @@ namespace ZL { } { - std::lock_guard pLock(playersMutex); auto& rp = remotePlayers[remoteId]; - rp.add_state(remoteState); } } @@ -243,10 +227,7 @@ namespace ZL { // Используем твой handle_full_sync, начиная со 2-го индекса (пропускаем ID в playerParts) remoteState.handle_full_sync(playerParts, 1); - { - std::lock_guard pLock(playersMutex); - remotePlayers[rId].add_state(remoteState); - } + remotePlayers[rId].add_state(remoteState); } } } @@ -263,30 +244,26 @@ namespace ZL { } std::vector WebSocketClientBase::getPendingProjectiles() { - std::lock_guard lock(projMutex_); - auto copy = pendingProjectiles_; - pendingProjectiles_.clear(); + std::vector copy; + copy.swap(pendingProjectiles_); return copy; } std::vector WebSocketClientBase::getPendingDeaths() { - std::lock_guard lock(deathsMutex_); - auto copy = pendingDeaths_; - pendingDeaths_.clear(); + std::vector copy; + copy.swap(pendingDeaths_); return copy; } std::vector WebSocketClientBase::getPendingRespawns() { - std::lock_guard lock(respawnMutex_); - auto copy = pendingRespawns_; - pendingRespawns_.clear(); + std::vector copy; + copy.swap(pendingRespawns_); return copy; } std::vector WebSocketClientBase::getPendingBoxDestructions() { - std::lock_guard lock(boxDestructionsMutex_); - auto copy = pendingBoxDestructions_; - pendingBoxDestructions_.clear(); + std::vector copy; + copy.swap(pendingBoxDestructions_); return copy; } } diff --git a/src/network/WebSocketClientBase.h b/src/network/WebSocketClientBase.h index 75480c1..1ca30cb 100644 --- a/src/network/WebSocketClientBase.h +++ b/src/network/WebSocketClientBase.h @@ -1,34 +1,25 @@ -#pragma once +#pragma once #include "NetworkInterface.h" -#include -#include +#include +#include namespace ZL { - + // All state in WebSocketClientBase is only accessed from the main thread: + // HandlePollMessage() runs from Poll(), and get*() are called from Game/Space on the main thread. + // No mutexes needed. class WebSocketClientBase : public INetworkClient { protected: - - std::unordered_map remotePlayers; - std::mutex playersMutex; // Серверные коробки std::vector> serverBoxes_; - std::mutex boxesMutex; std::vector pendingProjectiles_; - std::mutex projMutex_; - std::vector pendingDeaths_; - std::mutex deathsMutex_; - std::vector pendingRespawns_; - std::mutex respawnMutex_; - std::vector pendingBoxDestructions_; - std::mutex boxDestructionsMutex_; int clientId = -1; int64_t timeOffset = 0; @@ -42,12 +33,10 @@ namespace ZL { std::string SignMessage(const std::string& msg); std::unordered_map getRemotePlayers() override { - std::lock_guard lock(playersMutex); return remotePlayers; } std::vector> getServerBoxes() override { - std::lock_guard lock(boxesMutex); return serverBoxes_; }