#include "WebSocketClient.h" #include #include #include "RemotePlayer.h" // Вспомогательный split std::vector split(const std::string& s, char delimiter) { std::vector tokens; std::string token; std::istringstream tokenStream(s); while (std::getline(tokenStream, token, delimiter)) { tokens.push_back(token); } return tokens; } namespace ZL { void WebSocketClient::Connect(const std::string& host, uint16_t port) { try { boost::asio::ip::tcp::resolver resolver(ioc_); auto const results = resolver.resolve(host, std::to_string(port)); ws_ = std::make_unique>(ioc_); // Выполняем синхронный коннект и handshake для простоты старта boost::beast::get_lowest_layer(*ws_).connect(results); ws_->handshake(host, "/"); connected = true; // Запускаем асинхронное чтение в пуле потоков TaskManager startAsyncRead(); } catch (std::exception& e) { std::cerr << "Network Error: " << e.what() << std::endl; } } void WebSocketClient::startAsyncRead() { ws_->async_read(buffer_, [this](boost::beast::error_code ec, std::size_t bytes) { if (!ec) { std::string msg = boost::beast::buffers_to_string(buffer_.data()); buffer_.consume(bytes); // Тяжелая десериализация происходит здесь (в потоке TaskManager) processIncomingMessage(msg); startAsyncRead(); } else { connected = false; } }); } void WebSocketClient::processIncomingMessage(const std::string& msg) { // Логика парсинга... if (msg.rfind("ID:", 0) == 0) { clientId = std::stoi(msg.substr(3)); } // Безопасно кладем в очередь для главного потока std::lock_guard lock(queueMutex); messageQueue.push(msg); } void WebSocketClient::Poll() { std::lock_guard lock(queueMutex); while (!messageQueue.empty()) { auto nowTime = std::chrono::system_clock::now(); //Apply server delay: nowTime -= std::chrono::milliseconds(CLIENT_DELAY); auto now_ms = std::chrono::duration_cast( nowTime.time_since_epoch() ).count(); std::string msg = messageQueue.front(); messageQueue.pop(); if (msg.rfind("EVENT:", 0) == 0) { auto parts = split(msg, ':'); if (parts.size() < 5) continue; // EVENT:ID:TYPE:TIME:DATA... int remoteId = std::stoi(parts[1]); std::string subType = parts[2]; uint64_t sentTime = std::stoull(parts[3]); ClientState remoteState; remoteState.id = remoteId; std::chrono::system_clock::time_point uptime_timepoint{ std::chrono::duration_cast(std::chrono::milliseconds(sentTime)) }; remoteState.lastUpdateServerTime = uptime_timepoint; if (subType == "VEL") { remoteState.selectedVelocity = std::stoi(parts[4]); int startFrom = 5; remoteState.position = { std::stof(parts[startFrom]), std::stof(parts[startFrom + 1]), std::stof(parts[startFrom + 2]) }; Eigen::Quaternionf q( std::stof(parts[startFrom + 3]), std::stof(parts[startFrom + 4]), std::stof(parts[startFrom + 5]), std::stof(parts[startFrom + 6])); remoteState.rotation = q.toRotationMatrix(); remoteState.currentAngularVelocity = Eigen::Vector3f{ std::stof(parts[startFrom + 7]), std::stof(parts[startFrom + 8]), std::stof(parts[startFrom + 9]) }; remoteState.velocity = std::stof(parts[startFrom + 10]); remoteState.selectedVelocity = std::stoi(parts[startFrom + 11]); remoteState.discreteMag = std::stof(parts[startFrom + 12]); remoteState.discreteAngle = std::stoi(parts[startFrom + 13]); } else if (subType == "ROT") { remoteState.discreteAngle = std::stoi(parts[4]); remoteState.discreteMag = std::stof(parts[5]); int startFrom = 6; remoteState.position = { std::stof(parts[startFrom]), std::stof(parts[startFrom + 1]), std::stof(parts[startFrom + 2]) }; Eigen::Quaternionf q( std::stof(parts[startFrom + 3]), std::stof(parts[startFrom + 4]), std::stof(parts[startFrom + 5]), std::stof(parts[startFrom + 6])); remoteState.rotation = q.toRotationMatrix(); remoteState.currentAngularVelocity = Eigen::Vector3f{ std::stof(parts[startFrom + 7]), std::stof(parts[startFrom + 8]), std::stof(parts[startFrom + 9]) }; remoteState.velocity = std::stof(parts[startFrom + 10]); remoteState.selectedVelocity = std::stoi(parts[startFrom + 11]); remoteState.discreteMag = std::stof(parts[startFrom + 12]); remoteState.discreteAngle = std::stoi(parts[startFrom + 13]); } else if (subType == "PING") { //remoteState.discreteAngle = std::stoi(parts[4]); //remoteState.discreteMag = std::stof(parts[5]); int startFrom = 4; remoteState.position = { std::stof(parts[startFrom]), std::stof(parts[startFrom + 1]), std::stof(parts[startFrom + 2]) }; Eigen::Quaternionf q( std::stof(parts[startFrom + 3]), std::stof(parts[startFrom + 4]), std::stof(parts[startFrom + 5]), std::stof(parts[startFrom + 6])); remoteState.rotation = q.toRotationMatrix(); remoteState.currentAngularVelocity = Eigen::Vector3f{ std::stof(parts[startFrom + 7]), std::stof(parts[startFrom + 8]), std::stof(parts[startFrom + 9]) }; remoteState.velocity = std::stof(parts[startFrom + 10]); remoteState.selectedVelocity = std::stoi(parts[startFrom + 11]); remoteState.discreteMag = std::stof(parts[startFrom + 12]); remoteState.discreteAngle = std::stoi(parts[startFrom + 13]); } { std::lock_guard pLock(playersMutex); auto& rp = remotePlayers[remoteId]; rp.timedRemoteStates.push_back(remoteState); auto cutoff_time = nowTime - std::chrono::milliseconds(CUTOFF_TIME); while (rp.timedRemoteStates.size() > 0 && rp.timedRemoteStates[0].lastUpdateServerTime < cutoff_time) { rp.timedRemoteStates.erase(rp.timedRemoteStates.begin()); } } /* std::lock_guard pLock(playersMutex); if (remotePlayers.count(remoteId)) { auto& rp = remotePlayers[remoteId]; if (subType == "VEL") { rp.selectedVelocity = std::stoi(parts[4]); int startFrom = 5; rp.position = { std::stof(parts[startFrom]), std::stof(parts[startFrom+1]), std::stof(parts[startFrom+2]) }; Eigen::Quaternionf q( std::stof(parts[startFrom + 3]), std::stof(parts[startFrom + 4]), std::stof(parts[startFrom + 5]), std::stof(parts[startFrom + 6])); rp.rotation = q.toRotationMatrix(); rp.currentAngularVelocity = Eigen::Vector3f{ std::stof(parts[startFrom + 7]), std::stof(parts[startFrom + 8]), std::stof(parts[startFrom + 9]) }; rp.velocity = std::stof(parts[startFrom + 10]); rp.selectedVelocity = std::stoi(parts[startFrom + 11]); rp.discreteMag = std::stof(parts[startFrom + 12]); rp.discreteAngle = std::stoi(parts[startFrom + 13]); } else if (subType == "ROT") { rp.discreteAngle = std::stoi(parts[4]); rp.discreteMag = std::stof(parts[5]); int startFrom = 6; rp.position = { std::stof(parts[startFrom]), std::stof(parts[startFrom + 1]), std::stof(parts[startFrom + 2]) }; Eigen::Quaternionf q( std::stof(parts[startFrom + 3]), std::stof(parts[startFrom + 4]), std::stof(parts[startFrom + 5]), std::stof(parts[startFrom + 6])); rp.rotation = q.toRotationMatrix(); rp.currentAngularVelocity = Eigen::Vector3f{ std::stof(parts[startFrom + 7]), std::stof(parts[startFrom + 8]), std::stof(parts[startFrom + 9]) }; rp.velocity = std::stof(parts[startFrom + 10]); rp.selectedVelocity = std::stoi(parts[startFrom + 11]); rp.discreteMag = std::stof(parts[startFrom + 12]); rp.discreteAngle = std::stoi(parts[startFrom + 13]); } std::cout << "EVENT Received discreteMag=" << rp.discreteMag << std::endl; // --- КОМПЕНСАЦИЯ ЛАГА --- // Вычисляем задержку в секундах long long lag_s = now_ms - sentTime; // Защита от отрицательного лага (разница часов) или слишком огромного lag_s = std::max(0ll, std::min(lag_s, 1000ll)); if (lag_s > 0) { // "Докручиваем" физику конкретного игрока на величину задержки // В ClientState.h simulate_physics должен использовать // актуальные discreteAngle/Mag rp.simulate_physics(lag_s); } // Обновляем метку времени, чтобы обычная экстраполяция в Game.cpp // знала, что состояние уже актуально на момент now_ms rp.lastUpdateServerTime = nowTime; }*/ } else if (msg.rfind("WORLD_UPDATE|", 0) == 0) { parseWorldUpdate(msg, nowTime); } } } void WebSocketClient::parseWorldUpdate(const std::string& msg, std::chrono::system_clock::time_point now_ms) { // Используем мьютекс, так как этот метод вызывается из сетевого потока (TaskManager) std::lock_guard lock(playersMutex); // Формат: WORLD_UPDATE|server_now_ms|count|id,x,y,z,w,qx,qy,qz,v;... auto parts = split(msg, '|'); if (parts.size() < 4) return; uint64_t serverTime = std::stoull(parts[1]); int count = std::stoi(parts[2]); auto playersData = split(parts[3], ';'); for (const auto& pData : playersData) { auto vals = split(pData, ','); if (vals.size() < 9) continue; int id = std::stoi(vals[0]); if (id == this->clientId) continue; // Пропускаем себя // Логика обновления или создания RemotePlayer updateRemotePlayer(id, vals, serverTime, now_ms); } } void WebSocketClient::updateRemotePlayer(int id, const std::vector& vals, uint64_t serverTime, std::chrono::system_clock::time_point now_ms) { //auto& rp = remotePlayers[id]; ClientState remoteState; remoteState.id = id; remoteState.position = { std::stof(vals[1]), std::stof(vals[2]), std::stof(vals[3]) }; remoteState.rotation = Eigen::Quaternionf(std::stof(vals[4]), std::stof(vals[5]), std::stof(vals[6]), std::stof(vals[7])); // 3. Сохраняем остальные физические параметры (для экстраполяции или отладки) remoteState.velocity = std::stof(vals[8]); remoteState.currentAngularVelocity = { std::stof(vals[9]), std::stof(vals[10]), std::stof(vals[11]) }; remoteState.selectedVelocity = std::stoi(vals[12]); remoteState.discreteMag = std::stof(vals[13]); remoteState.discreteAngle = std::stoi(vals[14]); std::chrono::system_clock::time_point uptime_timepoint{ std::chrono::duration_cast(std::chrono::milliseconds(serverTime)) }; //std::cout << "PING Received discreteMag=" << rp.discreteMag << std::endl; remoteState.lastUpdateServerTime = uptime_timepoint; auto& rp = remotePlayers[id]; rp.timedRemoteStates.push_back(remoteState); auto cutoff_time = now_ms - std::chrono::milliseconds(CUTOFF_TIME); while (rp.timedRemoteStates.size() > 0 && rp.timedRemoteStates[0].lastUpdateServerTime < cutoff_time) { rp.timedRemoteStates.erase(rp.timedRemoteStates.begin()); } //rp.apply_lag_compensation(now_ms); } void WebSocketClient::Send(const std::string& message) { if (!connected) return; auto ss = std::make_shared(message); std::lock_guard lock(writeMutex_); writeQueue_.push(ss); // Если сейчас ничего не записывается, инициируем первую запись if (!isWriting_) { doWrite(); } } void WebSocketClient::doWrite() { // Эта функция всегда вызывается под мьютексом или из колбэка if (writeQueue_.empty()) { isWriting_ = false; return; } isWriting_ = true; auto message = writeQueue_.front(); // Захватываем self (shared_from_this), чтобы объект не удалился во время записи ws_->async_write( boost::asio::buffer(*message), [this, message](boost::beast::error_code ec, std::size_t) { if (ec) { connected = false; return; } std::lock_guard lock(writeMutex_); writeQueue_.pop(); // Удаляем отправленное сообщение doWrite(); // Проверяем следующее } ); } }