Try out Cursor

This commit is contained in:
Vladislav Khorev 2026-02-22 21:20:39 +03:00
parent d883b58260
commit f6cc30a30c
4 changed files with 85 additions and 119 deletions

View File

@ -1,4 +1,4 @@
#ifdef NETWORK #ifdef NETWORK
#include "WebSocketClient.h" #include "WebSocketClient.h"
#include <iostream> #include <iostream>
@ -43,34 +43,22 @@ namespace ZL {
} }
void WebSocketClient::processIncomingMessage(const std::string& msg) { void WebSocketClient::processIncomingMessage(const std::string& msg) {
// Логика парсинга... // Lock-free push: producer (I/O thread) pushes to its buffer
/*if (msg.rfind("ID:", 0) == 0) { readProducerBuf_.load(std::memory_order_relaxed)->push_back(msg);
clientId = std::stoi(msg.substr(3));
}*/
// Безопасно кладем в очередь для главного потока
std::lock_guard<std::mutex> lock(queueMutex);
messageQueue.push(msg);
} }
void WebSocketClient::Poll() { void WebSocketClient::Poll() {
std::lock_guard<std::mutex> lock(queueMutex); // Lock-free drain: swap consumer buffer with producer if ours is empty, then process all
MessageBuf* c = readConsumerBuf_.load(std::memory_order_acquire);
while (!messageQueue.empty()) { if (c->empty()) {
/* MessageBuf* p = readProducerBuf_.exchange(c, std::memory_order_acq_rel);
auto nowTime = std::chrono::system_clock::now(); readConsumerBuf_.store(p, std::memory_order_release);
c = p;
//Apply server delay: }
nowTime -= std::chrono::milliseconds(CLIENT_DELAY); for (std::string& msg : *c) {
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
nowTime.time_since_epoch()
).count();*/
std::string msg = messageQueue.front();
messageQueue.pop();
HandlePollMessage(msg); HandlePollMessage(msg);
} }
c->clear();
} }
@ -79,54 +67,49 @@ namespace ZL {
if (!connected) return; if (!connected) return;
std::string finalMessage = SignMessage(message); std::string finalMessage = SignMessage(message);
/* auto ss = std::make_shared<std::string>(std::move(finalMessage));
#ifdef ENABLE_NETWORK_CHECKSUM
// Вычисляем хеш. Для примера используем std::hash,
// но в продакшене лучше взять быструю реализацию типа MurmurHash3.
size_t hashValue = std::hash<std::string>{}(message + NET_SECRET);
// Преобразуем хеш в hex-строку для передачи // Lock-free push to write queue
std::stringstream ss_hash; writeProducerBuf_.load(std::memory_order_relaxed)->push_back(ss);
ss_hash << std::hex << hashValue;
// Добавляем хеш в конец сообщения через разделитель // Start write chain if not already writing
// Например: "UPD:12345:pos...#hash:a1b2c3d4" bool expected = false;
finalMessage += "#hash:" + ss_hash.str(); if (isWriting_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
#endif
*/
auto ss = std::make_shared<std::string>(finalMessage);
std::lock_guard<std::mutex> lock(writeMutex_);
writeQueue_.push(ss);
// Если сейчас ничего не записывается, инициируем первую запись
if (!isWriting_) {
doWrite(); doWrite();
} }
} }
void WebSocketClient::doWrite() { void WebSocketClient::doWrite() {
// Эта функция всегда вызывается под мьютексом или из колбэка // Lock-free: take next message from consumer buffer; swap buffers if drained
if (writeQueue_.empty()) { WriteBuf* c = writeConsumerBuf_.load(std::memory_order_acquire);
isWriting_ = false; if (currentWriteBuf_ == nullptr || currentWriteIndex_ >= currentWriteBuf_->size()) {
if (currentWriteBuf_) {
currentWriteBuf_->clear();
}
currentWriteBuf_ = c;
if (currentWriteBuf_->empty()) {
WriteBuf* p = writeProducerBuf_.exchange(currentWriteBuf_, std::memory_order_acq_rel);
writeConsumerBuf_.store(p, std::memory_order_release);
currentWriteBuf_ = p;
}
currentWriteIndex_ = 0;
}
if (currentWriteIndex_ >= currentWriteBuf_->size()) {
isWriting_.store(false, std::memory_order_release);
return; return;
} }
isWriting_ = true; std::shared_ptr<std::string> message = (*currentWriteBuf_)[currentWriteIndex_++];
auto message = writeQueue_.front();
// Захватываем self (shared_from_this), чтобы объект не удалился во время записи
ws_->async_write( ws_->async_write(
boost::asio::buffer(*message), boost::asio::buffer(*message),
[this, message](boost::beast::error_code ec, std::size_t) { [this, message](boost::beast::error_code ec, std::size_t) {
if (ec) { if (ec) {
connected = false; connected = false;
isWriting_.store(false, std::memory_order_release);
return; return;
} }
doWrite();
std::lock_guard<std::mutex> lock(writeMutex_);
writeQueue_.pop(); // Удаляем отправленное сообщение
doWrite(); // Проверяем следующее
} }
); );
} }

View File

@ -1,9 +1,11 @@
#pragma once #pragma once
#ifdef NETWORK #ifdef NETWORK
#include "WebSocketClientBase.h" #include "WebSocketClientBase.h"
#include <queue> #include <vector>
#include <atomic>
#include <memory>
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp> #include <boost/asio/connect.hpp>
@ -11,21 +13,30 @@
namespace ZL { namespace ZL {
// Lock-free SPSC double-buffer: producer pushes to one buffer, consumer swaps and drains the other.
// No mutexes; avoids contention under high message load.
class WebSocketClient : public WebSocketClientBase { class WebSocketClient : public WebSocketClientBase {
private: private:
// Переиспользуем io_context из TaskManager
boost::asio::io_context& ioc_; boost::asio::io_context& ioc_;
// Объекты переехали в члены класса
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>> ws_; std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>> ws_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::queue<std::string> messageQueue; // Incoming messages: I/O thread pushes, main thread drains in Poll()
std::mutex queueMutex; // Защита для messageQueue using MessageBuf = std::vector<std::string>;
MessageBuf readBuffer0_;
MessageBuf readBuffer1_;
std::atomic<MessageBuf*> readProducerBuf_;
std::atomic<MessageBuf*> readConsumerBuf_;
std::queue<std::shared_ptr<std::string>> writeQueue_; // Outgoing messages: main thread pushes in Send(), doWrite()/completion drains
bool isWriting_ = false; using WriteBuf = std::vector<std::shared_ptr<std::string>>;
std::mutex writeMutex_; // Отдельный мьютекс для очереди записи WriteBuf writeBuffer0_;
WriteBuf writeBuffer1_;
std::atomic<WriteBuf*> writeProducerBuf_;
std::atomic<WriteBuf*> writeConsumerBuf_;
WriteBuf* currentWriteBuf_ = nullptr;
size_t currentWriteIndex_ = 0;
std::atomic<bool> isWriting_{ false };
bool connected = false; bool connected = false;
@ -34,7 +45,13 @@ namespace ZL {
void processIncomingMessage(const std::string& msg); void processIncomingMessage(const std::string& msg);
public: public:
explicit WebSocketClient(boost::asio::io_context& ioc) : ioc_(ioc) {} explicit WebSocketClient(boost::asio::io_context& ioc)
: ioc_(ioc)
, readProducerBuf_(&readBuffer0_)
, readConsumerBuf_(&readBuffer1_)
, writeProducerBuf_(&writeBuffer0_)
, writeConsumerBuf_(&writeBuffer1_)
{}
void Connect(const std::string& host, uint16_t port) override; void Connect(const std::string& host, uint16_t port) override;

View File

@ -1,4 +1,4 @@
#ifdef NETWORK #ifdef NETWORK
#include "WebSocketClientBase.h" #include "WebSocketClientBase.h"
#include <iostream> #include <iostream>
@ -70,10 +70,7 @@ namespace ZL {
} }
} }
} }
{ serverBoxes_ = std::move(parsedBoxes);
std::lock_guard<std::mutex> bLock(boxesMutex);
serverBoxes_ = std::move(parsedBoxes);
}
return; return;
} }
if (msg.rfind("RESPAWN_ACK:", 0) == 0) { if (msg.rfind("RESPAWN_ACK:", 0) == 0) {
@ -81,14 +78,8 @@ namespace ZL {
if (parts.size() >= 2) { if (parts.size() >= 2) {
try { try {
int respawnedPlayerId = std::stoi(parts[1]); int respawnedPlayerId = std::stoi(parts[1]);
{ pendingRespawns_.push_back(respawnedPlayerId);
std::lock_guard<std::mutex> rLock(respawnMutex_); remotePlayers.erase(respawnedPlayerId);
pendingRespawns_.push_back(respawnedPlayerId);
}
{
std::lock_guard<std::mutex> pLock(playersMutex);
remotePlayers.erase(respawnedPlayerId);
}
std::cout << "Client: Received RESPAWN_ACK for player " << respawnedPlayerId << std::endl; std::cout << "Client: Received RESPAWN_ACK for player " << respawnedPlayerId << std::endl;
} }
catch (...) {} catch (...) {}
@ -110,10 +101,7 @@ namespace ZL {
); );
destruction.destroyedBy = std::stoi(parts[6]); destruction.destroyedBy = std::stoi(parts[6]);
{ pendingBoxDestructions_.push_back(destruction);
std::lock_guard<std::mutex> lock(boxDestructionsMutex_);
pendingBoxDestructions_.push_back(destruction);
}
std::cout << "Client: Received BOX_DESTROYED for box " << destruction.boxIndex std::cout << "Client: Received BOX_DESTROYED for box " << destruction.boxIndex
<< " destroyed by player " << destruction.destroyedBy << std::endl; << " destroyed by player " << destruction.destroyedBy << std::endl;
@ -146,7 +134,6 @@ namespace ZL {
pi.rotation = q.toRotationMatrix(); pi.rotation = q.toRotationMatrix();
pi.velocity = std::stof(parts[10]); pi.velocity = std::stof(parts[10]);
std::lock_guard<std::mutex> pl(projMutex_);
pendingProjectiles_.push_back(pi); pendingProjectiles_.push_back(pi);
} }
catch (...) {} catch (...) {}
@ -168,7 +155,6 @@ namespace ZL {
); );
di.killerId = std::stoi(parts[6]); di.killerId = std::stoi(parts[6]);
std::lock_guard<std::mutex> dl(deathsMutex_);
pendingDeaths_.push_back(di); pendingDeaths_.push_back(di);
} }
catch (...) {} catch (...) {}
@ -215,9 +201,7 @@ namespace ZL {
} }
{ {
std::lock_guard<std::mutex> pLock(playersMutex);
auto& rp = remotePlayers[remoteId]; auto& rp = remotePlayers[remoteId];
rp.add_state(remoteState); rp.add_state(remoteState);
} }
} }
@ -243,10 +227,7 @@ namespace ZL {
// Используем твой handle_full_sync, начиная со 2-го индекса (пропускаем ID в playerParts) // Используем твой handle_full_sync, начиная со 2-го индекса (пропускаем ID в playerParts)
remoteState.handle_full_sync(playerParts, 1); remoteState.handle_full_sync(playerParts, 1);
{ remotePlayers[rId].add_state(remoteState);
std::lock_guard<std::mutex> pLock(playersMutex);
remotePlayers[rId].add_state(remoteState);
}
} }
} }
} }
@ -263,30 +244,26 @@ namespace ZL {
} }
std::vector<ProjectileInfo> WebSocketClientBase::getPendingProjectiles() { std::vector<ProjectileInfo> WebSocketClientBase::getPendingProjectiles() {
std::lock_guard<std::mutex> lock(projMutex_); std::vector<ProjectileInfo> copy;
auto copy = pendingProjectiles_; copy.swap(pendingProjectiles_);
pendingProjectiles_.clear();
return copy; return copy;
} }
std::vector<DeathInfo> WebSocketClientBase::getPendingDeaths() { std::vector<DeathInfo> WebSocketClientBase::getPendingDeaths() {
std::lock_guard<std::mutex> lock(deathsMutex_); std::vector<DeathInfo> copy;
auto copy = pendingDeaths_; copy.swap(pendingDeaths_);
pendingDeaths_.clear();
return copy; return copy;
} }
std::vector<int> WebSocketClientBase::getPendingRespawns() { std::vector<int> WebSocketClientBase::getPendingRespawns() {
std::lock_guard<std::mutex> lock(respawnMutex_); std::vector<int> copy;
auto copy = pendingRespawns_; copy.swap(pendingRespawns_);
pendingRespawns_.clear();
return copy; return copy;
} }
std::vector<BoxDestroyedInfo> WebSocketClientBase::getPendingBoxDestructions() { std::vector<BoxDestroyedInfo> WebSocketClientBase::getPendingBoxDestructions() {
std::lock_guard<std::mutex> lock(boxDestructionsMutex_); std::vector<BoxDestroyedInfo> copy;
auto copy = pendingBoxDestructions_; copy.swap(pendingBoxDestructions_);
pendingBoxDestructions_.clear();
return copy; return copy;
} }
} }

View File

@ -1,34 +1,25 @@
#pragma once #pragma once
#include "NetworkInterface.h" #include "NetworkInterface.h"
#include <queue> #include <vector>
#include <mutex> #include <unordered_map>
namespace ZL { namespace ZL {
// All state in WebSocketClientBase is only accessed from the main thread:
// HandlePollMessage() runs from Poll(), and get*() are called from Game/Space on the main thread.
// No mutexes needed.
class WebSocketClientBase : public INetworkClient { class WebSocketClientBase : public INetworkClient {
protected: protected:
std::unordered_map<int, ClientStateInterval> remotePlayers; std::unordered_map<int, ClientStateInterval> remotePlayers;
std::mutex playersMutex;
// Серверные коробки // Серверные коробки
std::vector<std::pair<Eigen::Vector3f, Eigen::Matrix3f>> serverBoxes_; std::vector<std::pair<Eigen::Vector3f, Eigen::Matrix3f>> serverBoxes_;
std::mutex boxesMutex;
std::vector<ProjectileInfo> pendingProjectiles_; std::vector<ProjectileInfo> pendingProjectiles_;
std::mutex projMutex_;
std::vector<DeathInfo> pendingDeaths_; std::vector<DeathInfo> pendingDeaths_;
std::mutex deathsMutex_;
std::vector<int> pendingRespawns_; std::vector<int> pendingRespawns_;
std::mutex respawnMutex_;
std::vector<BoxDestroyedInfo> pendingBoxDestructions_; std::vector<BoxDestroyedInfo> pendingBoxDestructions_;
std::mutex boxDestructionsMutex_;
int clientId = -1; int clientId = -1;
int64_t timeOffset = 0; int64_t timeOffset = 0;
@ -42,12 +33,10 @@ namespace ZL {
std::string SignMessage(const std::string& msg); std::string SignMessage(const std::string& msg);
std::unordered_map<int, ClientStateInterval> getRemotePlayers() override { std::unordered_map<int, ClientStateInterval> getRemotePlayers() override {
std::lock_guard<std::mutex> lock(playersMutex);
return remotePlayers; return remotePlayers;
} }
std::vector<std::pair<Eigen::Vector3f, Eigen::Matrix3f>> getServerBoxes() override { std::vector<std::pair<Eigen::Vector3f, Eigen::Matrix3f>> getServerBoxes() override {
std::lock_guard<std::mutex> lock(boxesMutex);
return serverBoxes_; return serverBoxes_;
} }