RobinSR/gameserver/src/net/gateway.rs

223 lines
7.2 KiB
Rust

use std::{
collections::HashMap,
net::SocketAddr,
path::Path,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
time::{Duration, SystemTime},
};
use anyhow::Result;
use common::sr_tools::FreesrData;
use rand::RngCore;
use crate::net::PlayerSession;
use tokio::{
net::UdpSocket,
sync::{Mutex, RwLock, mpsc},
};
use crate::net::packet::NetOperation;
const MAX_PACKET_SIZE: usize = 1400;
pub struct Gateway {
socket: Arc<UdpSocket>,
id_counter: AtomicU32,
sessions: Mutex<HashMap<u32, Arc<RwLock<PlayerSession>>>>,
}
impl Gateway {
pub async fn new(host: &str, port: u16) -> Result<Self> {
let socket = Arc::new(UdpSocket::bind(format!("{host}:{port}")).await?);
Ok(Self {
socket,
id_counter: AtomicU32::new(0),
sessions: Mutex::new(HashMap::new()),
})
}
pub async fn listen(&mut self) -> Result<()> {
tracing::info!(
"KCP Gateway is listening at {}",
self.socket.local_addr().unwrap()
);
let mut buf = [0; MAX_PACKET_SIZE];
loop {
let Ok((len, addr)) = self.socket.recv_from(&mut buf).await else {
continue;
};
match len {
20 => self.process_net_operation(buf[..len].into(), addr).await?,
28.. => self.process_kcp_payload(buf[..len].into(), addr).await,
_ => {
tracing::warn!("unk data len {len}")
}
}
}
}
async fn process_net_operation(&mut self, op: NetOperation, addr: SocketAddr) -> Result<()> {
match (op.head, op.tail) {
(0xFF, 0xFFFFFFFF) => self.establish_kcp_session(op.data, addr).await?,
(0x194, 0x19419494) => self.drop_kcp_session(op.param1, op.param2, addr).await,
_ => tracing::warn!("Unknown magic pair received {:X}-{:X}", op.head, op.tail),
}
Ok(())
}
async fn establish_kcp_session(&mut self, data: u32, addr: SocketAddr) -> Result<()> {
let (conv_id, session_token) = self.next_conv_pair();
tracing::info!("New connection from addr: {addr} with conv_id: {conv_id}");
let session = Arc::new(RwLock::new(PlayerSession::new(
self.socket.clone(),
addr,
conv_id,
session_token,
)));
// Init the json to session
let _ = session
.write()
.await
.json_data
.set(FreesrData::load().await);
let session_ref = session.clone();
// FS watcher
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(100);
let mut debouncer =
notify_debouncer_mini::new_debouncer(Duration::from_millis(1000), move |ev| {
let _ = tx.blocking_send(ev);
})
.unwrap();
let path = Path::new("freesr-data.json");
debouncer
.watcher()
.watch(path, notify::RecursiveMode::NonRecursive)
.unwrap();
tracing::info!("watching freesr-data.json changes");
let mut shutdown_rx = session.read().await.shutdown_rx.clone();
let mut last_modified = std::fs::metadata(path)
.and_then(|meta| meta.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
loop {
tokio::select! {
res = rx.recv() => {
let Some(res) = res else {
break;
};
match res {
Ok(events) => {
if events
.iter()
.any(|p| p.path.file_name() == path.file_name())
{
if let Ok(metadata) = std::fs::metadata(path) {
if let Ok(modified) = metadata.modified() {
if modified > last_modified {
last_modified = modified;
let mut session = session.write().await;
if let Some(json) = session.json_data.get_mut() {
let _ = json.update().await;
session.sync_player().await;
tracing::info!("json updated");
}
}
}
}
}
}
Err(e) => eprintln!("json watcher error: {:?}", e),
}
}
_ = shutdown_rx.changed() => {
break;
}
}
}
tracing::info!("unwatch freesr-data.json");
});
let mut sessions = self.sessions.lock().await;
for session in sessions.values_mut() {
let _ = session.write().await.shutdown_tx.send(());
}
sessions.clear();
sessions.insert(conv_id, session_ref);
self.socket
.send_to(
&Vec::from(NetOperation {
head: 0x145,
param1: conv_id,
param2: session_token,
data,
tail: 0x14514545,
}),
addr,
)
.await?;
Ok(())
}
async fn drop_kcp_session(&mut self, conv_id: u32, token: u32, addr: SocketAddr) {
tracing::info!("drop_kcp_session {conv_id} {token}");
let mut sessions = self.sessions.lock().await;
let Some(session) = sessions.get(&conv_id) else {
tracing::warn!("drop_kcp_session failed, no session with conv_id {conv_id} was found");
return;
};
let session = session.write().await;
if session.token == token {
let _ = session.shutdown_tx.send(());
drop(session);
sessions.remove(&conv_id);
tracing::info!("Client from {addr} disconnected");
}
}
async fn process_kcp_payload(&mut self, data: Box<[u8]>, addr: SocketAddr) {
let conv_id = mhy_kcp::get_conv(&data);
let mut sessions = self.sessions.lock().await;
let Some(session) = sessions.get_mut(&conv_id).map(|s| s.clone()) else {
tracing::warn!("Session with conv_id {conv_id} not found!");
return;
};
tokio::spawn(async move {
if let Err(err) = Box::pin(session.write().await.consume(&data)).await {
tracing::error!("An error occurred while processing session ({addr}): {err}");
}
});
}
fn next_conv_pair(&mut self) -> (u32, u32) {
(
self.id_counter.fetch_add(1, Ordering::SeqCst) + 1,
rand::rng().next_u32(),
)
}
}