• 如何使用Rust开发一个实时聊天应用程序
  • 发布于 2个月前
  • 132 热度
    0 评论
  • LoveC
  • 1 粉丝 35 篇博客
  •   
在本文中,我们将演示如何使用Rust和React构建一个实时聊天应用程序,该应用程序提供聊天功能,检查用户状态,并指示用户何时输入。我们将使用WebSockets来启用客户端和服务器的双向通信。

实时聊天应用介绍
实时聊天应用程序允许用户通过文本、语音或视频进行实时交流。这种类型的应用程序需要比其他类型的通信(如电子邮件或即时通讯)更即时的消息传递。

聊天应用程序必须实时工作有以下几个原因:
提高效率:更直接的交流使对话更自然
更快的响应:实时功能可以改善用户体验
更高的可靠性:通过实时功能,消息丢失或延迟的可能性更小

我们使用WebSockets在实时聊天应用程序中实现客户端和服务器之间的双向通信,使用Rust构建WebSocket服务器将使服务器能够处理大量连接而不会减慢速度。


实时聊天应用架构设计
我们将构建一个简单的服务器,我们的应用程序架构将包括以下特性:
聊天:两个用户之间直接通过消息传递
输入提示:当用户开始向对方输入对话内容时,会提示对方
用户状态:用户是在线还是离线

这种架构非常简单,易于实现。它只由几个部分组成:
WebSocket服务器:这是我们应用程序中最重要的组件,它处理用户和聊天室之间的所有通信。
聊天室管理:这个组件负责管理应用程序中的所有聊天室。它将创建、更新和删除聊天室,该组件将位于HTTP服务器上。
用户管理:该组件负责管理应用程序中的所有用户。它将创建、更新和删除用户,该组件也将位于HTTP服务器上。
消息管理:该组件负责管理应用程序中的所有消息。它将创建、更新和删除消息,这个组件将位于WebSocket服务器和HTTP服务器上。它将用于存储来自WebSockets的传入消息,并在用户通过Rest API打开聊天室时检索数据库中已经存在的所有消息

用Rust构建WebSocket服务器
有很多lib可以用来在Rust中编写WebSocket服务器。在本教程中,我们将使用Actix Web,它是一个成熟的软件包,易于使用。

首先,使用以下命令创建一个Rust项目:
cargo new rust-react-chat
接下来,在Cargo.toml文件中加入依赖包:
[dependencies]
actix = "0.13.0"
actix-files = "0.6.2"
actix-web = "4.2.1"
actix-web-actors = "4.1.0"
rand = "0.8.5"
serde = {version = "1.0.147", features = ["derive"]}
serde_json = "1.0.88"
diesel = { version = "2", features = ["sqlite", "r2d2"] }
uuid = { version = "1", features = ["v4", "serde"] }
chrono = "0.4.23"
actix-cors = "0.6.4"
现在,安装diesel_cli,我们将使用这个作为我们的ORM:
cargo install diesel_cli --no-default-features --features sqlite
下面是项目的结构:
├── Cargo.lock
├── Cargo.toml
├── README.md
├── chat.db
├── .env
└── src
    ├── db.rs
    ├── main.rs
    ├── models.rs
    ├── routes.rs
    ├── schema.rs
    ├── server.rs
    └── session.rs
└── static
└── ui
src:这个文件夹包含我们所有的Rust代码
static:这个文件夹包含我们所有的静态资源、HTML文件、JavaScript文件和图像
ui:这个文件夹包含我们的React代码;我们稍后将其编译为静态文件并将其导出到静态文件夹中

数据访问层
设置数据库
接下来,让我们准备数据库。为了保持简单,我们将使用SQLite。下面是schema:

users表:存储用户数据。由于此时我们没有实现完整的身份验证系统,因此我们现在只保存用户名和电话号码
rooms表:存储所有聊天室
conversations表:数据库中所有存储的消息

接下来,为我们的schema生成数据库表。在项目根目录下创建migrations文件夹,并创建.env文件并写入如下内容:
DATABASE_URL="chat.db"
然后执行如下命令:
diesel setup
diesel migration generate create_users
diesel migration generate create_rooms
diesel migration generate create_conversations
分别在下面的文件中写入SQL语句:
-- migrations/xxxx_create_users/up.sql
CREATE TABLE users (
  id TEXT PRIMARY KEY NOT NULL,
  username VARCHAR NOT NULL,
  phone VARCHAR NOT NULL,
  created_at TEXT NOT NULL,
  unique(phone)
)

-- migrations/xxxx_create_rooms/up.sql
CREATE TABLE rooms (
  id TEXT PRIMARY KEY NOT NULL,
  name VARCHAR NOT NULL,
  last_message TEXT NOT NULL,
  participant_ids TEXT NOT NULL,
  created_at TEXT NOT NULL
)

-- migrations/xxxx_create_conversations/up.sql
CREATE TABLE conversations (
  id TEXT PRIMARY KEY NOT NULL,
  room_id TEXT NOT NULL,
  user_id TEXT NOT NULL,
  content VARCHAR NOT NULL,
  created_at TEXT NOT NULL
)
我们还需要添加一些虚拟数据,以便稍后为客户端初始渲染提供一些示例:
diesel migration generate dummy_data
数据插入SQL如下:
-- migrations/xxxx_generate_dummy_data/up.sql
INSERT INTO users(id, username, phone, created_at) 
VALUES
("4fbd288c-d3b2-4f78-adcf-def976902d50","Ahmad Rosid","123","2022-11-23T07:56:30.214162+00:00"),
("1e9a12c1-e98c-4a83-a55a-32cc548a169d","Ashley Young","345","2022-11-23T07:56:30.214162+00:00"),
("1bc833808-05ed-455a-9d26-64fe1d96d62d","Charles Edward","678","2022-12-23T07:56:30.214162+00:00");
INSERT INTO rooms(id, name, last_message, participant_ids, created_at)
VALUES
("f061383b-0393-4ce8-9a85-f31d03762263", "Charles Edward", "Hi, how are you?", "1e9a12c1-e98c-4a83-a55a-32cc548a169d,1bc833808-05ed-455a-9d26-64fe1d96d62d", "2022-12-23T07:56:30.214162+00:00"),
("008e9dc4-f01d-4429-ba31-986d7e63cce8", "Ahmad Rosid", "Hi... are free today?", "1e9a12c1-e98c-4a83-a55a-32cc548a169d,1bc833808-05ed-455a-9d26-64fe1d96d62d", "2022-12-23T07:56:30.214162+00:00");
INSERT INTO conversations(id, user_id, room_id, content, created_at)
VALUES
("9aeab1a7-e063-40d1-a120-1f7585fa47d6", "1bc833808-05ed-455a-9d26-64fe1d96d62d", "f061383b-0393-4ce8-9a85-f31d03762263", "Hello", "2022-12-23T07:56:30.214162+00:00"),
("f4e54e70-736b-4a79-a622-3659b0b555e8", "1e9a12c1-e98c-4a83-a55a-32cc548a169d", "f061383b-0393-4ce8-9a85-f31d03762263", "Hi, how are you?", "2022-12-23T07:56:30.214162+00:00"),
("d3ea6e39-ed58-4613-8922-b78f14a2676a", "1bc833808-05ed-455a-9d26-64fe1d96d62d", "008e9dc4-f01d-4429-ba31-986d7e63cce8", "Hi... are free today?", "2022-12-23T07:56:30.214162+00:00");
现在让我们运行migration生成schema:
diesel migration run   
src/schema.rs文件如下:
diesel::table! {
    conversations (id) {
        id -> Text,
        room_id -> Text,
        user_id -> Text,
        content -> Text,
        created_at -> Text,
    }
}

diesel::table! {
    rooms (id) {
        id -> Text,
        name -> Text,
        last_message -> Text,
        participant_ids -> Text,
        created_at -> Text,
    }
}

diesel::table! {
    users (id) {
        id -> Text,
        username -> Text,
        phone -> Text,
        created_at -> Text,
    }
}

diesel::allow_tables_to_appear_in_same_query!(
    conversations,
    rooms,
    users,
); 
模型
让我们创建一些结构来存储所有的表。需要记住的一点是,结构体中属性的顺序应该与模式文件中的顺序相同。如果顺序不匹配,就会得到错误的数据。
src/models.rs文件内容如下:
use serde::{Deserialize, Serialize};
use crate::schema::*;
// 堆代码 duidaima.com
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable)]
pub struct User {
    pub id: String,
    pub username: String,
    pub phone: String,
    pub created_at: String
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Queryable, Insertable)]
pub struct Conversation {
    pub id: String,
    pub room_id: String,
    pub user_id: String,
    pub content: String,
    pub created_at: String
}

#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Insertable)]
pub struct Room {
    pub id: String,
    pub name: String,
    pub last_message: String,
    pub participant_ids: String,
    pub created_at: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewUser {
    pub username: String,
    pub phone: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewConversation {
    pub user_id: String,
    pub room_id: String,
    pub message: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomResponse {
    pub room: Room,
    pub users: Vec<User>,
}
访问数据
现在,让我们通过src/db.rs文件从数据库中访问数据。首先,导入依赖项:
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use std::{
    collections::{HashMap, HashSet},
    time::SystemTime,
};
use uuid::Uuid;
use crate::models::{Conversation, NewConversation, Room, RoomResponse, User};

type DbError = Box<dyn std::error::Error + Send + Sync>;
由于SQLite没有日期功能,我们将创建一个:
fn iso_date() -> String {
    let now = SystemTime::now();
    let now: DateTime<Utc> = now.into();
    return now.to_rfc3339();
}
1,通过电话号码查找用户
在这里,我们将建立一个查询,该查询实现一个简单的登录特性,并使我们能够通过电话号码查找用户。
pub fn find_user_by_phone(
    conn: &mut SqliteConnection,
    user_phone: String,
) -> Result<Option<User>, DbError> {
    use crate::schema::users::dsl::*;
    let user = users
        .filter(phone.eq(user_phone))
        .first::<User>(conn)
        .optional()?;
    Ok(user)
}
2,增加新用户
这是一个用于存储注册了我们应用程序的新用户,这也是身份验证系统的一部分。
pub fn insert_new_user(conn: &mut SqliteConnection, nm: &str, pn: &str) -> Result<User, DbError> {
    use crate::schema::users::dsl::*;
    let new_user = User {
        id: Uuid::new_v4().to_string(),
        username: nm.to_owned(),
        phone: pn.to_owned(),
        created_at: iso_date(),
    };
    diesel::insert_into(users).values(&new_user).execute(conn)?;
    Ok(new_user)
}
3,添加了新用户后,我们现在插入新的对话
pub fn insert_new_conversation(
    conn: &mut SqliteConnection,
    new: NewConversation,
) -> Result<Conversation, DbError> {
    use crate::schema::conversations::dsl::*;
    let new_conversation = Conversation {
        id: Uuid::new_v4().to_string(),
        user_id: new.user_id,
        room_id: new.room_id,
        content: new.message,
        created_at: iso_date(),
    };
    diesel::insert_into(conversations)
        .values(&new_conversation)
        .execute(conn)?;
    Ok(new_conversation)
}
4,查找聊天室和参与者
接下来,让我们建立一个查询从数据库中获取所有的聊天室和参与者。
pub fn get_all_rooms(conn: &mut SqliteConnection) -> Result<Vec<RoomResponse>, DbError> {
    use crate::schema::rooms;
    use crate::schema::users;
    let rooms_data: Vec<Room> = rooms::table.get_results(conn)?;
    let mut ids = HashSet::new();
    let mut rooms_map = HashMap::new();
    let data = rooms_data.to_vec();
    for room in &data {
        let user_ids = room
            .participant_ids
            .split(",")
            .into_iter()
            .collect::<Vec<_>>();
        for id in user_ids.to_vec() {
            ids.insert(id.to_string());
        }
        rooms_map.insert(room.id.to_string(), user_ids.to_vec());
    }
    let ids = ids.into_iter().collect::<Vec<_>>();
    let users_data: Vec<User> = users::table
        .filter(users::id.eq_any(ids))
        .get_results(conn)?;
    let users_map: HashMap<String, User> = HashMap::from_iter(
        users_data
            .into_iter()
            .map(|item| (item.id.to_string(), item)),
    );
    let response_rooms = rooms_data.into_iter().map(|room| {
        let users = rooms_map
            .get(&room.id.to_string())
            .unwrap()
            .into_iter()
            .map(|id| users_map.get(id.to_owned()).unwrap().clone())
            .collect::<Vec<_>>();
        return RoomResponse{ room, users };
    }).collect::<Vec<_>>();
    Ok(response_rooms)
}

pub fn find_user_by_uid(conn: &mut SqliteConnection, uid: Uuid) -> Result<Option<User>, DbError> {
    use crate::schema::users::dsl::*;

    let user = users
        .filter(id.eq(uid.to_string()))
        .first::<User>(conn)
        .optional()?;

    Ok(user)
}

pub fn get_conversation_by_room_uid(
    conn: &mut SqliteConnection,
    uid: Uuid,
) -> Result<Option<Vec<Conversation>>, DbError> {
    use crate::schema::conversations;

    let convo = conversations::table
        .filter(conversations::room_id.eq(uid.to_string()))
        .load(conn)
        .optional()?;

    Ok(convo)
}

WebSocket Server
现在,让我们编写Server,处理WebSocket连接。在src/server.rs文件中,写入如下代码:
use std::collections::{HashMap, HashSet};
use serde_json::json;
use actix::prelude::*;
use rand::{self, rngs::ThreadRng, Rng};
use crate::session;

#[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String);

#[derive(Message)]
#[rtype(usize)]
pub struct Connect {
    pub addr: Recipient<Message>,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect {
    pub id: usize,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct ClientMessage {
    pub id: usize,
    pub msg: String,
    pub room: String,
}

pub struct ListRooms;
impl actix::Message for ListRooms {
    type Result = Vec<String>;
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Join {
    pub id: usize,
    pub name: String,
}
接下来,让我们实现WebSocket连接的管理,这段代码将处理来自用户的所有消息,并将它们发送回聊天室中的参与者:
#[derive(Debug)]
pub struct ChatServer {
    sessions: HashMap<usize, Recipient<Message>>,
    rooms: HashMap<String, HashSet<usize>>,
    rng: ThreadRng,
}
impl ChatServer {
    pub fn new() -> ChatServer {
        let mut rooms = HashMap::new();
        rooms.insert("main".to_string(), HashSet::new());
        Self {
            sessions: HashMap::new(),
            rooms,
            rng: rand::thread_rng()
        }
    }
    fn send_message(&self, room: &str, message: &str, skip_id: usize) {
        if let Some(sessions) = self.rooms.get(room) {
            for id in sessions {
                if *id != skip_id {
                    if let Some(addr) = self.sessions.get(id) {
                        addr.do_send(Message(message.to_owned()));
                    }
                }
            }
        }
    }
}
impl Actor for ChatServer {
    type Context = Context<Self>;
}
impl Handler<Connect> for ChatServer {
    type Result = usize;
    fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
        let id = self.rng.gen::<usize>();
        self.sessions.insert(id, msg.addr);
        self.rooms
            .entry("main".to_string())
            .or_insert_with(HashSet::new)
            .insert(id);
        self.send_message("main", &json!({
            "value": vec![format!("{}", id)],
            "chat_type": session::ChatType::CONNECT
        }).to_string(), 0);
        id
    }
}
impl Handler<Disconnect> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: Disconnect, _: &mut Self::Context) -> Self::Result {
        let mut rooms: Vec<String> = vec![];
        if self.sessions.remove(&msg.id).is_some() {
            for (name, sessions) in &mut self.rooms {
                if sessions.remove(&msg.id) {
                    rooms.push(name.to_owned());
                }
            }
        }
        for room in rooms {
            self.send_message("main", &json!({
                "room": room,
                "value": vec![format!("Someone disconnect!")],
                "chat_type": session::ChatType::DISCONNECT
            }).to_string(), 0);
        }
    }
}
impl Handler<ClientMessage> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: ClientMessage, _: &mut Self::Context) -> Self::Result {
        self.send_message(&msg.room, &msg.msg, msg.id);
    }
}
impl Handler<ListRooms> for ChatServer {
    type Result = MessageResult<ListRooms>;
    fn handle(&mut self, _: ListRooms, _: &mut Self::Context) -> Self::Result {
        let mut rooms = vec![];
        for key in self.rooms.keys() {
            rooms.push(key.to_owned());
        }
        MessageResult(rooms)
    }
}
impl Handler<Join> for ChatServer {
    type Result = ();
    fn handle(&mut self, msg: Join, _: &mut Self::Context) -> Self::Result {
        let Join {id, name} = msg;
        let mut rooms = vec![];
        for (n, sessions) in &mut self.rooms {
            if sessions.remove(&id) {
                rooms.push(n.to_owned());
            }
        }
        for room in rooms {
            self.send_message(&room, &json!({
                "room": room,
                "value": vec![format!("Someone disconnect!")],
                "chat_type": session::ChatType::DISCONNECT
            }).to_string(), 0);
        }
        self.rooms
            .entry(name.clone())
            .or_insert_with(HashSet::new)
            .insert(id);
    }
}
处理用户会话
现在,让我们来处理用户会话。在这里,我们将接收一条消息,将其保存到数据库中,然后将其发送回聊天室中的参与者。
在src/session.rs文件中写入如下代码:
use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web::web;
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};
use crate::db;
use crate::models::NewConversation;
use crate::server;

const HEARBEET: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
HEARTBEAT是与客户端保持连接活动的持续时间,CLIENT_TIMEOUT是检查客户端是否连接超时的时间。

现在让我们创建一些结构体来存储我们需要的所有数据:
#[derive(Debug)]
pub struct WsChatSession {
    pub id: usize,
    pub hb: Instant,
    pub room: String,
    pub name: Option<String>,
    pub addr: Addr<server::ChatServer>,
    pub db_pool: web::Data<DbPool>,
}

#[derive(PartialEq, Serialize, Deserialize)]
pub enum ChatType {
    TYPING,
    TEXT,
    CONNECT,
    DISCONNECT,
}

#[derive(Serialize, Deserialize)]
struct ChatMessage {
    pub chat_type: ChatType,
    pub value: Vec<String>,
    pub room_id: String,
    pub user_id: String,
    pub id: usize,
}

WsChatSession:Actix Web角色的自定义实现
ChatMessage:定义发送给用户和从用户接收的对象
现在,让我们实现会话的Actor和流的Handler:
impl Actor for WsChatSession {
    type Context = ws::WebsocketContext<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);
        let addr = ctx.address();
        self.addr
            .send(server::Connect {
                addr: addr.recipient(),
            })
            .into_actor(self)
            .then(|res, act, ctx| {
                match res {
                    Ok(res) => act.id = res,
                    _ => ctx.stop(),
                }
                fut::ready(())
            })
            .wait(ctx);
    }
    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        self.addr.do_send(server::Disconnect { id: self.id });
        Running::Stop
    }
}
impl Handler<server::Message> for WsChatSession {
    type Result = ();
    fn handle(&mut self, msg: server::Message, ctx: &mut Self::Context) -> Self::Result {
        ctx.text(msg.0);
    }
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        let msg = match item {
            Err(_) => {
                ctx.stop();
                return;
            }
            Ok(msg) => msg,
        };
        match msg {
            ws::Message::Ping(msg) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            ws::Message::Pong(_) => {
                self.hb = Instant::now();
            }
            ws::Message::Text(text) => {
                let data_json = serde_json::from_str::<ChatMessage>(&text.to_string());
                if let Err(err) = data_json {
                    println!("{err}");
                    println!("Failed to parse message: {text}");
                    return;
                }
                let input = data_json.as_ref().unwrap();
                match &input.chat_type {
                    ChatType::TYPING => {
                        let chat_msg = ChatMessage {
                            chat_type: ChatType::TYPING,
                            value: input.value.to_vec(),
                            id: self.id,
                            room_id: input.room_id.to_string(),
                            user_id: input.user_id.to_string(),
                        };
                        let msg = serde_json::to_string(&chat_msg).unwrap();
                        self.addr.do_send(server::ClientMessage {
                            id: self.id,
                            msg,
                            room: self.room.clone(),
                        })
                    }
                    ChatType::TEXT => {
                        let input = data_json.as_ref().unwrap();
                        let chat_msg = ChatMessage {
                            chat_type: ChatType::TEXT,
                            value: input.value.to_vec(),
                            id: self.id,
                            room_id: input.room_id.to_string(),
                            user_id: input.user_id.to_string(),
                        };
                        let mut conn = self.db_pool.get().unwrap();
                        let new_conversation = NewConversation {
                            user_id: input.user_id.to_string(),
                            room_id: input.room_id.to_string(),
                            message: input.value.join(""),
                        };
                        let _ = db::insert_new_conversation(&mut conn, new_conversation);
                        let msg = serde_json::to_string(&chat_msg).unwrap();
                        self.addr.do_send(server::ClientMessage {
                            id: self.id,
                            msg,
                            room: self.room.clone(),
                        })
                    }
                    _ => {}
                }
            }
            ws::Message::Binary(_) => println!("Unsupported binary"),
            ws::Message::Close(reason) => {
                ctx.close(reason);
                ctx.stop();
            }
            ws::Message::Continuation(_) => {
                ctx.stop();
            }
            ws::Message::Nop => (),
        }
    }
}
impl WsChatSession {
    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
        ctx.run_interval(HEARBEET, |act, ctx| {
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                act.addr.do_send(server::Disconnect { id: act.id });
                ctx.stop();
                return;
            }
            ctx.ping(b"");
        });
    }
}

创建路由
现在让我们为服务器创建路由,由于我们使用REST HTTP和WebSocket服务器,因此我们可以轻松地将所有内容放在一个文件中。
在src/routes.rs文件中写入以下代码:
use std::time::Instant;
use actix::*;
use actix_files::NamedFile;
use actix_web::{get, post, web, Error, HttpRequest, HttpResponse, Responder};
use actix_web_actors::ws;
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};
use serde_json::json;
use uuid::Uuid;
use crate::db;
use crate::models;
use crate::server;
use crate::session;

type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;

pub async fn index() -> impl Responder {
    NamedFile::open_async("./static/index.html").await.unwrap()
}

pub async fn chat_server(
    req: HttpRequest,
    stream: web::Payload,
    pool: web::Data<DbPool>,
    srv: web::Data<Addr<server::ChatServer>>,
) -> Result<HttpResponse, Error> {
    ws::start(
        session::WsChatSession {
            id: 0,
            hb: Instant::now(),
            room: "main".to_string(),
            name: None,
            addr: srv.get_ref().clone(),
            db_pool: pool,
        },
        &req,
        stream
    )
}
接下来,我们需要向我们的路由添加一个REST API,以便使我们的聊天工作获得必要的数据:
#[post("/users/create")]
pub async fn create_user(
    pool: web::Data<DbPool>,
    form: web::Json<models::NewUser>,
) -> Result<HttpResponse, Error> {
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::insert_new_user(&mut conn, &form.username, &form.phone)
    })
    .await?
    .map_err(actix_web::error::ErrorUnprocessableEntity)?;
    Ok(HttpResponse::Ok().json(user))
}
#[get("/users/{user_id}")]
pub async fn get_user_by_id(
    pool: web::Data<DbPool>,
    id: web::Path<Uuid>,
) -> Result<HttpResponse, Error> {
    let user_id = id.to_owned();
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::find_user_by_uid(&mut conn, user_id)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(user) = user {
        Ok(HttpResponse::Ok().json(user))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No user found with phone: {id}")
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/conversations/{uid}")]
pub async fn get_conversation_by_id(
    pool: web::Data<DbPool>,
    uid: web::Path<Uuid>,
) -> Result<HttpResponse, Error> {
    let room_id = uid.to_owned();
    let conversations = web::block(move || {
        let mut conn = pool.get()?;
        db::get_conversation_by_room_uid(&mut conn, room_id)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(data) = conversations {
        Ok(HttpResponse::Ok().json(data))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No conversation with room_id: {room_id}")
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/users/phone/{user_phone}")]
pub async fn get_user_by_phone(
    pool: web::Data<DbPool>,
    phone: web::Path<String>,
) -> Result<HttpResponse, Error> {
    let user_phone = phone.to_string();
    let user = web::block(move || {
        let mut conn = pool.get()?;
        db::find_user_by_phone(&mut conn, user_phone)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if let Some(user) = user {
        Ok(HttpResponse::Ok().json(user))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": format!("No user found with phone: {}", phone.to_string())
            })
            .to_string(),
        );
        Ok(res)
    }
}
#[get("/rooms")]
pub async fn get_rooms(
    pool: web::Data<DbPool>,
) -> Result<HttpResponse, Error> {
    let rooms = web::block(move || {
        let mut conn = pool.get()?;
        db::get_all_rooms(&mut conn)
    })
    .await?
    .map_err(actix_web::error::ErrorInternalServerError)?;
    if !rooms.is_empty() {
        Ok(HttpResponse::Ok().json(rooms))
    } else {
        let res = HttpResponse::NotFound().body(
            json!({
                "error": 404,
                "message": "No rooms available at the moment.",
            })
            .to_string(),
        );
        Ok(res)
    }
}
最后,让我们为WebSocket服务器编写入口点main.rs,代码如下:
#[macro_use]
extern crate diesel;
use actix::*;
use actix_cors::Cors;
use actix_files::Files;
use actix_web::{web, http, App, HttpServer};
use diesel::{
    prelude::*,
    r2d2::{self, ConnectionManager},
};

mod db;
mod models;
mod routes;
mod schema;
mod server;
mod session;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let server = server::ChatServer::new().start();
    let conn_spec = "chat.db";
    let manager = ConnectionManager::<SqliteConnection>::new(conn_spec);
    let pool = r2d2::Pool::builder().build(manager).expect("Failed to create pool.");
    let server_addr = "127.0.0.1";
    let server_port = 8080;
    let app = HttpServer::new(move || {
        let cors = Cors::default()
            .allowed_origin("http://localhost:3000")
            .allowed_origin("http://localhost:8080")
            .allowed_methods(vec!["GET", "POST"])
            .allowed_headers(vec![http::header::AUTHORIZATION, http::header::ACCEPT])
            .allowed_header(http::header::CONTENT_TYPE)
            .max_age(3600);
        App::new()
            .app_data(web::Data::new(server.clone()))
            .app_data(web::Data::new(pool.clone()))
            .wrap(cors)
            .service(web::resource("/").to(routes::index))
            .route("/ws", web::get().to(routes::chat_server))
            .service(routes::create_user)
            .service(routes::get_user_by_id)
            .service(routes::get_user_by_phone)
            .service(routes::get_conversation_by_id)
            .service(routes::get_rooms)
            .service(Files::new("/", "./static"))
    })
    .workers(2)
    .bind((server_addr, server_port))?
    .run();
    println!("Server running at http://{server_addr}:{server_port}/");
    app.await
}
actix_cors:用于调试UI,我们将接受来自localhost:3000或localhost:8080的POST和GET请求
actix_web:用于Actix Web包中所有与http相关的特性
actix_files:用于在路由中嵌入静态文件
diesel:用于查询SQLite数据库中的数据。如果你愿意,也可以将其更改为Postgres或MySQL。
serde_json:用于解析我们发送到React应用程序的JSON数据
用户评论