use std::{ io::Error, net::SocketAddr, pin::Pin, sync::{Arc, OnceLock}, task::{Context, Poll}, }; use anyhow::Result; use common::sr_tools::FreesrData; use mhy_kcp::Kcp; use prost::Message; use proto::{AvatarSync, CmdID, CmdPlayerType, PlayerSyncScNotify}; use tokio::{ io::AsyncWrite, net::UdpSocket, sync::{Mutex, watch}, }; use crate::util; use super::{NetPacket, packet::CommandHandler}; struct RemoteEndPoint { socket: Arc, addr: SocketAddr, } pub struct PlayerSession { pub token: u32, kcp: Arc>>, start_time: u64, pub shutdown_tx: watch::Sender<()>, pub shutdown_rx: watch::Receiver<()>, pub json_data: OnceLock, } impl PlayerSession { pub fn new(socket: Arc, addr: SocketAddr, conv: u32, token: u32) -> Self { let (shutdown_tx, shutdown_rx) = watch::channel(()); Self { token, kcp: Arc::new(Mutex::new(Kcp::new(conv, token, false, RemoteEndPoint { socket, addr, }))), start_time: util::cur_timestamp_secs(), json_data: OnceLock::new(), shutdown_rx, shutdown_tx, } } pub async fn consume(&mut self, buffer: &[u8]) -> Result<()> { let mut kcp = self.kcp.lock().await; kcp.input(buffer)?; kcp.async_update(self.session_time() as u32).await?; kcp.async_flush().await?; let mut packets = Vec::new(); let mut buf = [0; 24756]; while let Ok(length) = kcp.recv(&mut buf) { packets.push(NetPacket::from(&buf[..length])); } drop(kcp); for packet in packets { if packet.cmd_type == CmdPlayerType::CmdPlayerLogoutCsReq as u16 { tracing::info!("Player logged out"); let _ = self.shutdown_tx.send(()); return Ok(()); }; Self::on_message(self, packet.cmd_type, packet.body).await?; } self.kcp .lock() .await .async_update(self.session_time() as u32) .await?; Ok(()) } pub async fn send(&self, body: impl Message + CmdID) -> Result<()> { let mut buf = Vec::new(); body.encode(&mut buf)?; tracing::info!("sent packet with CmdID: {}", body.get_cmd_id()); let payload: Vec = NetPacket { cmd_type: body.get_cmd_id(), head: Vec::new(), body: buf, } .into(); let mut kcp = self.kcp.lock().await; kcp.send(&payload)?; kcp.async_flush().await?; kcp.async_update(self.session_time() as u32).await?; Ok(()) } pub async fn send_raw(&self, payload: NetPacket) -> Result<()> { let mut kcp = self.kcp.lock().await; let payload: Vec = payload.into(); kcp.send(&payload)?; kcp.async_flush().await?; kcp.async_update(self.session_time() as u32).await?; Ok(()) } pub async fn sync_player(&self) { let Some(json) = self.json_data.get() else { tracing::error!("data is not init!"); return; }; // clear relics & lightcones self.send(PlayerSyncScNotify { del_equipment_list: (2000..3500).collect(), del_relic_list: (1..2000).collect(), ..Default::default() }) .await .unwrap(); // Sync avatars self.send(PlayerSyncScNotify { avatar_sync: Some(AvatarSync { avatar_list: json .avatars .values() .map(|avatar| avatar.to_avatar_proto(Option::None, vec![])) .collect::>(), }), multi_path_avatar_type_info_list: json.get_multi_path_info(), ..Default::default() }) .await .unwrap(); // Sync new relics self.send(PlayerSyncScNotify { relic_list: json.relics.iter().map(|v| v.to_relic_proto()).collect(), equipment_list: json .lightcones .iter() .map(|v| v.to_equipment_proto()) .collect(), ..Default::default() }) .await .unwrap(); // Sync new lightcones self.send(PlayerSyncScNotify { avatar_sync: Some(AvatarSync { avatar_list: json .avatars .values() .map(|avatar| { avatar.to_avatar_proto( json.lightcones .iter() .find(|v| v.equip_avatar == avatar.avatar_id), json.relics .iter() .filter(|v| v.equip_avatar == avatar.avatar_id) .collect(), ) }) .collect(), }), ..Default::default() }) .await .unwrap() } fn session_time(&self) -> u64 { util::cur_timestamp_secs() - self.start_time } } // Auto implemented impl CommandHandler for PlayerSession {} impl AsyncWrite for RemoteEndPoint { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.socket.poll_send_to(cx, buf, self.addr) } fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } }