• 如何基于Rust的P2P网络组件构建实时通讯系统
  • 发布于 1周前
  • 70 热度
    0 评论
在WebSocket基础设施上创建点对点(P2P)网络似乎是一项艰巨的任务。在这篇文章中,我们将介绍基于Rust的P2P网络的关键组件,探索每个部分如何构建无缝的WebSocket基础设施。使用它们构建一个健壮且高效的P2P网络,允许节点之间的实时通信。

P2P网络简介
点对点网络支持节点之间的分散通信,允许数据交换而不依赖于中心服务器。网络中的每个参与者,或“对等端”,可以同时充当客户端和服务器端,促进直接连接和通信。在我们的示例中,我们使用WebSocket,它在单个TCP连接上提供全双工通信通道,以促进这种实时交互。

项目概述
我们的项目旨在演示如何在Rust中使用WebSocket建立P2P网络,利用Tokio异步运行时的强大功能。我们将探讨以下关键组件:
命令行参数解析:使用clap解析对等url和绑定地址。
WebSocket Actor:管理到对等端的WebSocket连接。
网络状态管理:维护网络的状态,包括连接的对等点。
连接处理:管理对等端WebSocket连接的生命周期。
广播消息:定期向所有连接的对等端发送消息。
优雅关闭:优雅地处理中断以关闭网络。

项目开发
使用以下命令创建一个Rust新项目:
cargo new p2p-ws-example
在Cargo.toml文件中加入以下依赖项:
[dependencies]
clap = { version = "4.5.13", features = ["derive"] }
env_logger = "0.11.5"
futures = "0.3.30"
log = "0.4.22"
serde = "1.0.204"
tokio = { version = "1.39.2", features = ["full"] }
tokio-tungstenite = "0.23.1"
tokio-util = { version = "0.7.11", features = ["full"] }
命令行参数解析
我们将从使用clap定义命令行参数开始。这允许我们在启动P2P节点时指定对等url和绑定地址。
use std::net::ToSocketAddrs;
use clap::Parser;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// 要连接的客户端地址列表
    #[arg(short, long, value_delimiter = ',', value_parser = parse_peer)]
    peers: Vec<String>,
    /// 堆代码 duidaima.com
    /// 绑定服务器的地址
    #[arg(short, long, value_parser = parse_bind)]
    bind: String,
}

/// 解析并验证客户端url
fn parse_peer(s: &str) -> Result<String, String> {
    // 验证以ws://或wss://开头的URL
    if s.starts_with("ws://") {
        let ip_port = &s[5..];
        if let Ok(_socket_addr) = ip_port.to_socket_addrs() {
            return Ok(s.to_string());
        }
    }
    Err(format!("Invalid client URL: {}", s))
}

/// 解析并验证绑定地址
fn parse_bind(s: &str) -> Result<String, String> {
    if let Ok(_socket_addr) = s.to_socket_addrs() {
        return Ok(s.to_string());
    }
    Err(format!("Invalid bind address: {}", s))
}
peers:以逗号分隔的WebSocket url列表,以对等端的形式连接。

bind:连接绑定服务器的地址。


WebSocket Actor
WebSocketActor结构体管理WebSocket连接。它建立到给定URL的连接,并处理消息的发送和接收。
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

// 定义 WebSocket actor
struct WebSocketActor {
    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl WebSocketActor {
    async fn connect(url: &str) -> Option<Self> {
        match connect_async(url).await {
            Ok((conn, _)) => {
                log::info!("Connected successfully to {}", url);
                Some(WebSocketActor { ws_stream: conn })
            }
            Err(e) => {
                log::error!("Connection to {} failed: {:?}", url, e);
                None
            }
        }
    }
}
网络状态管理
P2PWebsocketNetwork结构体维护网络的状态,包括连接的对等点和用于消息广播的主发送方。
use std::{
    collections::HashMap,
    net::{SocketAddr, ToSocketAddrs},
    sync::{Arc, Mutex},
};

use clap::Parser;
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};

struct P2PWebsocketNetwork {
    addresses: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<P2PInnerMessage>>>>,
    master: Arc<Mutex<UnboundedSender<P2PInnerMessage>>>,
}

#[derive(Debug)]
struct P2PInnerMessage {
    message: Message,
    tx_handler: UnboundedSender<P2PInnerMessage>,
}
连接管理
我们将使用handle_connection和handle_server_connection函数处理传入和传出的连接。这些函数管理WebSocket连接的生命周期,发送和接收消息。
async fn handle_connection(
    state: Arc<P2PWebsocketNetwork>,
    conn: WebSocketActor,
    token: CancellationToken,
) {
    // 处理连接和消息交换的逻辑
}
state:包含网络信息的共享状态(Arc<P2PWebsocketNetwork>),包括已连接的对等端和消息处理程序。
conn:表示与对等端连接的WebSocketActor实例。
token:用于处理任务取消,以实现安全关机。

handle_connection函数体逻辑如下
1,提取套接字地址:
// 提取套接字地址作为客户端列表的键
let addr = match conn.ws_stream.get_ref() {
    MaybeTlsStream::Plain(f) => f.peer_addr().unwrap(),
    _ => {
        panic!("tls is not supported yet");
    }
};
addr:对等端的socket地址。代码从WebSocket流中提取对等端的地址。目前只支持非tls (plain)流。

2,设置消息通道:
// 这个tx应该在网络状态下共享
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
    let mut list = state.addresses.lock().unwrap();
    list.insert(addr, tx.clone());
}
tx和rx:用于在此函数和应用程序的其他部分之间发送和接收消息(P2PInnerMessage)的无界通道。

state.addresses:连接的对等端的地址存储在一个共享的HashMap中,新对等端的地址和它的发送者(tx)被添加到列表中。
3,拆分WebSocket流:
let (mut ws_tx, mut ws_rx) = conn.ws_stream.split();
ws_tx和ws_rx:WebSocket流分为发送方(ws_tx)和接收方(ws_rx),以异步方式处理传入和传出的消息。

4,循环消息处理
loop {
    tokio::select! {
        Some(msg) = ws_rx.next() => {
            log::debug!("Received: {:?}", msg);
            match msg {
                Ok(msg) => {
                    if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
                        message: msg,
                        tx_handler: tx.clone(),
                    }) {
                        log::error!("Failed to send message to master: {:?}", e);
                    }
                },
                Err(e) => {
                    log::error!("Error receiving message or connection closed: {:?}", e);
                    break
                }
            }
        }
        Some(msg) = rx.recv() => {
            log::debug!("Sending: {:?}", msg);
            if let Err(e) = ws_tx.send(msg.message).await {
                log::error!("Failed to send message on socket: {:?}", e);
            }
        }
        _ = token.cancelled() => {
            log::warn!("task cancelled");
            break
        }
    }
}
传入消息(ws_rx):循环使用tokio::select!等待多个异步事件。它首先检查来自对等端的传入消息(ws_rx.next())。如果成功接收到消息,则通过state.master将其转发给主处理程序。这可以对消息进行集中处理或路由。如果发生错误(例如,连接关闭),循环中断,有效地终止连接。

传出消息(rx):循环还检查从应用程序的内部通道(rx.recv())接收到的打算发送给对等端的消息。使用ws_tx.send(msg.message).await将消息发送到对等端。发送中的错误将被记录。

取消令牌:如果取消令牌被触发(Token .cancelled()),循环中断,允许任务干净地退出。

5,从列表中删除对等端
{
    // 从列表中删除客户端
    let mut list = state.addresses.lock().unwrap();
    list.remove(&addr);
}
一旦循环退出(由于错误或取消),对等体的地址将从连接地址列表中删除,以确保状态准确地反映当前网络连接。

handle_server_connection函数的操作逻辑与上述一样,代码如下:
async fn handle_server_connection(
    state: Arc<P2PWebsocketNetwork>,
    raw_stream: TcpStream,
    addr: SocketAddr,
    token: CancellationToken,
) {
    let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
    {
        let mut list = state.addresses.lock().unwrap();
        list.insert(addr, tx.clone());
    }

    log::info!("Incoming TCP connection from: {}", addr);

    let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
        Ok(ws) => ws,
        Err(e) => {
            log::error!("WebSocket handshake error: {:?}", e);
            return;
        }
    };

    log::info!("WebSocket connection established: {}", addr);

    let (mut ws_tx, mut ws_rx) = ws_stream.split();
    loop {
        tokio::select! {
            Some(msg) = ws_rx.next() => {
                log::debug!("Received: {:?}", msg);
                match msg {
                    Ok(msg) => {
                        if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
                            message: msg,
                            tx_handler: tx.clone(),
                        }) {
                            log::error!("Failed to send message to master: {:?}", e);
                        }
                    },
                    Err(e) => {
                        log::error!("Error receiving message or connection closed: {:?}", e);
                        break
                    }
                }
            }
            Some(msg) = rx.recv() => {
                log::debug!("Sending: {:?}", msg);
                if let Err(e) = ws_tx.send(msg.message).await {
                    log::error!("Failed to send message on socket: {:?}", e);
                }
            }
            _ = token.cancelled() => {
                log::warn!("task cancelled");
                break
            }
        }
    }
    {
        // 从列表中删除客户端
        let mut list = state.addresses.lock().unwrap();
        list.remove(&addr);
    }
}
广播消息
我们的广播功能定期向所有连接的对等点发送消息,展示了P2P网络传播信息的能力。
async fn broadcast(
    state: Arc<P2PWebsocketNetwork>,
    tx: UnboundedSender<P2PInnerMessage>,
    bind: String,
) {
    log::debug!("Broadcast start");

    // 广播到已连接的客户端
    let list = state.addresses.lock().unwrap();

    for (i, cl) in list.iter().enumerate() {
        log::debug!("Broadcasting to {} ", cl.0);
        if let Err(e) = cl.1.send(P2PInnerMessage {
            message: tungstenite::protocol::Message::text(format!(
                "Message to client {} from {}",
                i, bind
            )),
            tx_handler: tx.clone(),
        }) {
            log::error!("Failed to send broadcast message: {:?}", e);
        }
    }
    log::debug!("Broadcast end");
}


main函数
#[tokio::main]
async fn main() {
    let args = Args::parse();
    env_logger::init();

    let cancelation_token = CancellationToken::new();
    let tracker = TaskTracker::new();
    let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
    let network_state: Arc<P2PWebsocketNetwork> = Arc::new(P2PWebsocketNetwork {
        addresses: Arc::new(Mutex::new(HashMap::new())),
        master: Arc::new(Mutex::new(tx.clone())),
    });

    for url in &args.peers {
        log::info!("connecting to {} ...", url);
        if let Some(conn) = WebSocketActor::connect(url).await {
            tracker.spawn(handle_connection(
                network_state.clone(),
                conn,
                cancelation_token.clone(),
            ));
        } else {
            log::warn!("could not connect to server: {url}");
        }
    }

    let listener = TcpListener::bind(&args.bind).await.expect("Failed to bind");

    loop {
        tokio::select! {
            Ok((stream, addr)) = listener.accept() => {
                tracker.spawn(handle_server_connection(
                    network_state.clone(),
                    stream, addr, cancelation_token.clone()));
            }
            Some(msg) = rx.recv() => {
                log::debug!("consuming ->{msg:?}");
            }
            _ = tokio::signal::ctrl_c() => {
                log::warn!("Received Ctrl+C, shutting down...");
                tracker.close();
                cancelation_token.cancel();
                break
            }
            _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
                tracker.spawn(broadcast(network_state.clone(), tx.clone(), args.bind.clone()));
            }
        }
    }
    log::info!("waiting for all tasks");
    tracker.wait().await;
    log::debug!("tasks all are stoped");
}
在这里 我们将通过处理Ctrl+C信号并相应地取消任务来确保我们的网络可以优雅地关闭。

运行项目
在不同的终端中,使用这些参数按以下顺序运行程序:
$ RUST_LOG=debug cargo run -- --bind localhost:8080 # 启动第一个节点
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080 --bind localhost:8085 # 启动第二个节点
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080,ws://localhost:8085 --bind localhost:8086 # 启动第三个节点
然后,将看到从每个对等点向所有其他对等点广播消息。这是对所有人的广播!

总结
这篇文章使用Rust构建基于WebSocket的P2P网络示例,它提供了一种强大而有效的方式来实现节点之间的实时通信。通过理解代码的每个部分,可以扩展和定制这个示例以满足特定需求,无论是分散的应用程序,实时数据共享还是分布式计算任务。
用户评论