闽公网安备 35020302035485号
cargo new p2p-ws-example在Cargo.toml文件中加入以下依赖项:
[dependencies]
clap = { version = "4.5.13", features = ["derive"] }
env_logger = "0.11.5"
futures = "0.3.30"
log = "0.4.22"
serde = "1.0.204"
tokio = { version = "1.39.2", features = ["full"] }
tokio-tungstenite = "0.23.1"
tokio-util = { version = "0.7.11", features = ["full"] }
命令行参数解析use std::net::ToSocketAddrs;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// 要连接的客户端地址列表
#[arg(short, long, value_delimiter = ',', value_parser = parse_peer)]
peers: Vec<String>,
/// 堆代码 duidaima.com
/// 绑定服务器的地址
#[arg(short, long, value_parser = parse_bind)]
bind: String,
}
/// 解析并验证客户端url
fn parse_peer(s: &str) -> Result<String, String> {
// 验证以ws://或wss://开头的URL
if s.starts_with("ws://") {
let ip_port = &s[5..];
if let Ok(_socket_addr) = ip_port.to_socket_addrs() {
return Ok(s.to_string());
}
}
Err(format!("Invalid client URL: {}", s))
}
/// 解析并验证绑定地址
fn parse_bind(s: &str) -> Result<String, String> {
if let Ok(_socket_addr) = s.to_socket_addrs() {
return Ok(s.to_string());
}
Err(format!("Invalid bind address: {}", s))
}
peers:以逗号分隔的WebSocket url列表,以对等端的形式连接。bind:连接绑定服务器的地址。
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
// 定义 WebSocket actor
struct WebSocketActor {
ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl WebSocketActor {
async fn connect(url: &str) -> Option<Self> {
match connect_async(url).await {
Ok((conn, _)) => {
log::info!("Connected successfully to {}", url);
Some(WebSocketActor { ws_stream: conn })
}
Err(e) => {
log::error!("Connection to {} failed: {:?}", url, e);
None
}
}
}
}
网络状态管理use std::{
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
sync::{Arc, Mutex},
};
use clap::Parser;
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
struct P2PWebsocketNetwork {
addresses: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<P2PInnerMessage>>>>,
master: Arc<Mutex<UnboundedSender<P2PInnerMessage>>>,
}
#[derive(Debug)]
struct P2PInnerMessage {
message: Message,
tx_handler: UnboundedSender<P2PInnerMessage>,
}
连接管理async fn handle_connection(
state: Arc<P2PWebsocketNetwork>,
conn: WebSocketActor,
token: CancellationToken,
) {
// 处理连接和消息交换的逻辑
}
state:包含网络信息的共享状态(Arc<P2PWebsocketNetwork>),包括已连接的对等端和消息处理程序。// 提取套接字地址作为客户端列表的键
let addr = match conn.ws_stream.get_ref() {
MaybeTlsStream::Plain(f) => f.peer_addr().unwrap(),
_ => {
panic!("tls is not supported yet");
}
};
addr:对等端的socket地址。代码从WebSocket流中提取对等端的地址。目前只支持非tls (plain)流。// 这个tx应该在网络状态下共享
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
let mut list = state.addresses.lock().unwrap();
list.insert(addr, tx.clone());
}
tx和rx:用于在此函数和应用程序的其他部分之间发送和接收消息(P2PInnerMessage)的无界通道。loop {
tokio::select! {
Some(msg) = ws_rx.next() => {
log::debug!("Received: {:?}", msg);
match msg {
Ok(msg) => {
if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
message: msg,
tx_handler: tx.clone(),
}) {
log::error!("Failed to send message to master: {:?}", e);
}
},
Err(e) => {
log::error!("Error receiving message or connection closed: {:?}", e);
break
}
}
}
Some(msg) = rx.recv() => {
log::debug!("Sending: {:?}", msg);
if let Err(e) = ws_tx.send(msg.message).await {
log::error!("Failed to send message on socket: {:?}", e);
}
}
_ = token.cancelled() => {
log::warn!("task cancelled");
break
}
}
}
传入消息(ws_rx):循环使用tokio::select!等待多个异步事件。它首先检查来自对等端的传入消息(ws_rx.next())。如果成功接收到消息,则通过state.master将其转发给主处理程序。这可以对消息进行集中处理或路由。如果发生错误(例如,连接关闭),循环中断,有效地终止连接。{
// 从列表中删除客户端
let mut list = state.addresses.lock().unwrap();
list.remove(&addr);
}
一旦循环退出(由于错误或取消),对等体的地址将从连接地址列表中删除,以确保状态准确地反映当前网络连接。async fn handle_server_connection(
state: Arc<P2PWebsocketNetwork>,
raw_stream: TcpStream,
addr: SocketAddr,
token: CancellationToken,
) {
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
let mut list = state.addresses.lock().unwrap();
list.insert(addr, tx.clone());
}
log::info!("Incoming TCP connection from: {}", addr);
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
Ok(ws) => ws,
Err(e) => {
log::error!("WebSocket handshake error: {:?}", e);
return;
}
};
log::info!("WebSocket connection established: {}", addr);
let (mut ws_tx, mut ws_rx) = ws_stream.split();
loop {
tokio::select! {
Some(msg) = ws_rx.next() => {
log::debug!("Received: {:?}", msg);
match msg {
Ok(msg) => {
if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
message: msg,
tx_handler: tx.clone(),
}) {
log::error!("Failed to send message to master: {:?}", e);
}
},
Err(e) => {
log::error!("Error receiving message or connection closed: {:?}", e);
break
}
}
}
Some(msg) = rx.recv() => {
log::debug!("Sending: {:?}", msg);
if let Err(e) = ws_tx.send(msg.message).await {
log::error!("Failed to send message on socket: {:?}", e);
}
}
_ = token.cancelled() => {
log::warn!("task cancelled");
break
}
}
}
{
// 从列表中删除客户端
let mut list = state.addresses.lock().unwrap();
list.remove(&addr);
}
}
广播消息async fn broadcast(
state: Arc<P2PWebsocketNetwork>,
tx: UnboundedSender<P2PInnerMessage>,
bind: String,
) {
log::debug!("Broadcast start");
// 广播到已连接的客户端
let list = state.addresses.lock().unwrap();
for (i, cl) in list.iter().enumerate() {
log::debug!("Broadcasting to {} ", cl.0);
if let Err(e) = cl.1.send(P2PInnerMessage {
message: tungstenite::protocol::Message::text(format!(
"Message to client {} from {}",
i, bind
)),
tx_handler: tx.clone(),
}) {
log::error!("Failed to send broadcast message: {:?}", e);
}
}
log::debug!("Broadcast end");
}
main函数
#[tokio::main]
async fn main() {
let args = Args::parse();
env_logger::init();
let cancelation_token = CancellationToken::new();
let tracker = TaskTracker::new();
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
let network_state: Arc<P2PWebsocketNetwork> = Arc::new(P2PWebsocketNetwork {
addresses: Arc::new(Mutex::new(HashMap::new())),
master: Arc::new(Mutex::new(tx.clone())),
});
for url in &args.peers {
log::info!("connecting to {} ...", url);
if let Some(conn) = WebSocketActor::connect(url).await {
tracker.spawn(handle_connection(
network_state.clone(),
conn,
cancelation_token.clone(),
));
} else {
log::warn!("could not connect to server: {url}");
}
}
let listener = TcpListener::bind(&args.bind).await.expect("Failed to bind");
loop {
tokio::select! {
Ok((stream, addr)) = listener.accept() => {
tracker.spawn(handle_server_connection(
network_state.clone(),
stream, addr, cancelation_token.clone()));
}
Some(msg) = rx.recv() => {
log::debug!("consuming ->{msg:?}");
}
_ = tokio::signal::ctrl_c() => {
log::warn!("Received Ctrl+C, shutting down...");
tracker.close();
cancelation_token.cancel();
break
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
tracker.spawn(broadcast(network_state.clone(), tx.clone(), args.bind.clone()));
}
}
}
log::info!("waiting for all tasks");
tracker.wait().await;
log::debug!("tasks all are stoped");
}
在这里 我们将通过处理Ctrl+C信号并相应地取消任务来确保我们的网络可以优雅地关闭。$ RUST_LOG=debug cargo run -- --bind localhost:8080 # 启动第一个节点 $ RUST_LOG=debug cargo run -- --peers ws://localhost:8080 --bind localhost:8085 # 启动第二个节点 $ RUST_LOG=debug cargo run -- --peers ws://localhost:8080,ws://localhost:8085 --bind localhost:8086 # 启动第三个节点然后,将看到从每个对等点向所有其他对等点广播消息。这是对所有人的广播!