cargo new p2p-ws-example在Cargo.toml文件中加入以下依赖项:
[dependencies] clap = { version = "4.5.13", features = ["derive"] } env_logger = "0.11.5" futures = "0.3.30" log = "0.4.22" serde = "1.0.204" tokio = { version = "1.39.2", features = ["full"] } tokio-tungstenite = "0.23.1" tokio-util = { version = "0.7.11", features = ["full"] }命令行参数解析
use std::net::ToSocketAddrs; use clap::Parser; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { /// 要连接的客户端地址列表 #[arg(short, long, value_delimiter = ',', value_parser = parse_peer)] peers: Vec<String>, /// 堆代码 duidaima.com /// 绑定服务器的地址 #[arg(short, long, value_parser = parse_bind)] bind: String, } /// 解析并验证客户端url fn parse_peer(s: &str) -> Result<String, String> { // 验证以ws://或wss://开头的URL if s.starts_with("ws://") { let ip_port = &s[5..]; if let Ok(_socket_addr) = ip_port.to_socket_addrs() { return Ok(s.to_string()); } } Err(format!("Invalid client URL: {}", s)) } /// 解析并验证绑定地址 fn parse_bind(s: &str) -> Result<String, String> { if let Ok(_socket_addr) = s.to_socket_addrs() { return Ok(s.to_string()); } Err(format!("Invalid bind address: {}", s)) }peers:以逗号分隔的WebSocket url列表,以对等端的形式连接。
bind:连接绑定服务器的地址。
use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; // 定义 WebSocket actor struct WebSocketActor { ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>, } impl WebSocketActor { async fn connect(url: &str) -> Option<Self> { match connect_async(url).await { Ok((conn, _)) => { log::info!("Connected successfully to {}", url); Some(WebSocketActor { ws_stream: conn }) } Err(e) => { log::error!("Connection to {} failed: {:?}", url, e); None } } } }网络状态管理
use std::{ collections::HashMap, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, Mutex}, }; use clap::Parser; use tokio::{net::TcpStream, sync::mpsc::UnboundedSender}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; struct P2PWebsocketNetwork { addresses: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<P2PInnerMessage>>>>, master: Arc<Mutex<UnboundedSender<P2PInnerMessage>>>, } #[derive(Debug)] struct P2PInnerMessage { message: Message, tx_handler: UnboundedSender<P2PInnerMessage>, }连接管理
async fn handle_connection( state: Arc<P2PWebsocketNetwork>, conn: WebSocketActor, token: CancellationToken, ) { // 处理连接和消息交换的逻辑 }state:包含网络信息的共享状态(Arc<P2PWebsocketNetwork>),包括已连接的对等端和消息处理程序。
// 提取套接字地址作为客户端列表的键 let addr = match conn.ws_stream.get_ref() { MaybeTlsStream::Plain(f) => f.peer_addr().unwrap(), _ => { panic!("tls is not supported yet"); } };addr:对等端的socket地址。代码从WebSocket流中提取对等端的地址。目前只支持非tls (plain)流。
// 这个tx应该在网络状态下共享 let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>(); { let mut list = state.addresses.lock().unwrap(); list.insert(addr, tx.clone()); }tx和rx:用于在此函数和应用程序的其他部分之间发送和接收消息(P2PInnerMessage)的无界通道。
loop { tokio::select! { Some(msg) = ws_rx.next() => { log::debug!("Received: {:?}", msg); match msg { Ok(msg) => { if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage { message: msg, tx_handler: tx.clone(), }) { log::error!("Failed to send message to master: {:?}", e); } }, Err(e) => { log::error!("Error receiving message or connection closed: {:?}", e); break } } } Some(msg) = rx.recv() => { log::debug!("Sending: {:?}", msg); if let Err(e) = ws_tx.send(msg.message).await { log::error!("Failed to send message on socket: {:?}", e); } } _ = token.cancelled() => { log::warn!("task cancelled"); break } } }传入消息(ws_rx):循环使用tokio::select!等待多个异步事件。它首先检查来自对等端的传入消息(ws_rx.next())。如果成功接收到消息,则通过state.master将其转发给主处理程序。这可以对消息进行集中处理或路由。如果发生错误(例如,连接关闭),循环中断,有效地终止连接。
{ // 从列表中删除客户端 let mut list = state.addresses.lock().unwrap(); list.remove(&addr); }一旦循环退出(由于错误或取消),对等体的地址将从连接地址列表中删除,以确保状态准确地反映当前网络连接。
async fn handle_server_connection( state: Arc<P2PWebsocketNetwork>, raw_stream: TcpStream, addr: SocketAddr, token: CancellationToken, ) { let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>(); { let mut list = state.addresses.lock().unwrap(); list.insert(addr, tx.clone()); } log::info!("Incoming TCP connection from: {}", addr); let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await { Ok(ws) => ws, Err(e) => { log::error!("WebSocket handshake error: {:?}", e); return; } }; log::info!("WebSocket connection established: {}", addr); let (mut ws_tx, mut ws_rx) = ws_stream.split(); loop { tokio::select! { Some(msg) = ws_rx.next() => { log::debug!("Received: {:?}", msg); match msg { Ok(msg) => { if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage { message: msg, tx_handler: tx.clone(), }) { log::error!("Failed to send message to master: {:?}", e); } }, Err(e) => { log::error!("Error receiving message or connection closed: {:?}", e); break } } } Some(msg) = rx.recv() => { log::debug!("Sending: {:?}", msg); if let Err(e) = ws_tx.send(msg.message).await { log::error!("Failed to send message on socket: {:?}", e); } } _ = token.cancelled() => { log::warn!("task cancelled"); break } } } { // 从列表中删除客户端 let mut list = state.addresses.lock().unwrap(); list.remove(&addr); } }广播消息
async fn broadcast( state: Arc<P2PWebsocketNetwork>, tx: UnboundedSender<P2PInnerMessage>, bind: String, ) { log::debug!("Broadcast start"); // 广播到已连接的客户端 let list = state.addresses.lock().unwrap(); for (i, cl) in list.iter().enumerate() { log::debug!("Broadcasting to {} ", cl.0); if let Err(e) = cl.1.send(P2PInnerMessage { message: tungstenite::protocol::Message::text(format!( "Message to client {} from {}", i, bind )), tx_handler: tx.clone(), }) { log::error!("Failed to send broadcast message: {:?}", e); } } log::debug!("Broadcast end"); } main函数 #[tokio::main] async fn main() { let args = Args::parse(); env_logger::init(); let cancelation_token = CancellationToken::new(); let tracker = TaskTracker::new(); let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>(); let network_state: Arc<P2PWebsocketNetwork> = Arc::new(P2PWebsocketNetwork { addresses: Arc::new(Mutex::new(HashMap::new())), master: Arc::new(Mutex::new(tx.clone())), }); for url in &args.peers { log::info!("connecting to {} ...", url); if let Some(conn) = WebSocketActor::connect(url).await { tracker.spawn(handle_connection( network_state.clone(), conn, cancelation_token.clone(), )); } else { log::warn!("could not connect to server: {url}"); } } let listener = TcpListener::bind(&args.bind).await.expect("Failed to bind"); loop { tokio::select! { Ok((stream, addr)) = listener.accept() => { tracker.spawn(handle_server_connection( network_state.clone(), stream, addr, cancelation_token.clone())); } Some(msg) = rx.recv() => { log::debug!("consuming ->{msg:?}"); } _ = tokio::signal::ctrl_c() => { log::warn!("Received Ctrl+C, shutting down..."); tracker.close(); cancelation_token.cancel(); break } _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { tracker.spawn(broadcast(network_state.clone(), tx.clone(), args.bind.clone())); } } } log::info!("waiting for all tasks"); tracker.wait().await; log::debug!("tasks all are stoped"); }在这里 我们将通过处理Ctrl+C信号并相应地取消任务来确保我们的网络可以优雅地关闭。
$ RUST_LOG=debug cargo run -- --bind localhost:8080 # 启动第一个节点 $ RUST_LOG=debug cargo run -- --peers ws://localhost:8080 --bind localhost:8085 # 启动第二个节点 $ RUST_LOG=debug cargo run -- --peers ws://localhost:8080,ws://localhost:8085 --bind localhost:8086 # 启动第三个节点然后,将看到从每个对等点向所有其他对等点广播消息。这是对所有人的广播!