• 如何实现Rust应用程序的分布式跟踪以及如何使用它来监视应用程序的性能
  • 发布于 2个月前
  • 316 热度
    0 评论
在这篇文章中,我们将从DevOps的角度展示如何监控Rust应用程序并生成跟踪数据。我们使用广泛认可的Jaeger UI进行分析跟踪,以深入了解应用程序的行为。并从这些痕迹中得出RED(率、错误和持续时间)指标,并在Grafana中监控它们。

我们的演练需要做到以下4点:
f.用Actix构建简单的web API。
2.推送痕迹数据和指标数据到Quickwit。
3.使用Jaeger UI检测、诊断和解决问题。
4.在Grafana中监控应用程序的RED指标(率,错误,持续时间)。

在我们深入讨论细节之前,请确保你的系统上安装了以下软件并正常运行:
1. Rust 1.68+
2. Docker

3. docker-compose

构建和测试Rust应用程序
我们将使用Actix web框架创建一个基本的Rust应用程序,它将从JSONPlaceholder网站的公共web API获取帖子及其评论,并将其显示为JSON。
项目初始化和核心依赖项
新建一个名为rust-app-tracing的目录并在目录下创建一个Rust项目:
cargo new web-api
在Cargo.toml文件中加入以下依赖项:
[dependencies]
actix-web = "4.1.0"
actix-web-opentelemetry = "0.13.0"
anyhow = "1.0.71"
futures = "0.3.28"
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["reqwest-client", "reqwest-rustls", "http-proto"] }
opentelemetry-semantic-conventions = "0.11.0"
reqwest = {version = "0.11.18", features = ["json"] }
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
tokio = { version = "1.24", features = ["full"] }
tracing = "0.1.36"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8.5"
log = "0.4.17"
tracing-actix-web = "0.7"
actix-web-opentelemetry:actix-web框架的open-telemetry扩展。
openelemetry:Rust的核心open-telemetry SDK,包括跟踪和度量。
opentelemetry-otlp:提供各种open-telemetry的输出。

web API应用程序代码
首先,创建一个名为src/telemetry.rs的文件来配置应用程序的跟踪。我们将在其中设置所有的跟踪配置。
use opentelemetry::{global, KeyValue};
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::{trace, Resource};
use opentelemetry_otlp::WithExportConfig;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::Registry;
use tracing_subscriber::{prelude::*, EnvFilter};

const SERVICE_NAME: &'static str = "quickwit-jaeger-demo";

pub fn init_telemetry(exporter_endpoint: &str) {
    // 堆代码 duidaima.com
    // 创建 gRPC 输出
    let exporter = opentelemetry_otlp::new_exporter()
        .tonic()
        .with_endpoint(exporter_endpoint);

    // 定义跟踪器
    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(exporter)
        .with_trace_config(
            trace::config().with_resource(Resource::new(vec![KeyValue::new(
                opentelemetry_semantic_conventions::resource::SERVICE_NAME,
                SERVICE_NAME.to_string(),
            )])),
        )
        .install_batch(opentelemetry::runtime::Tokio)
        .expect("Error: Failed to initialize the tracer.");

    // 定义订阅者
    let subscriber = Registry::default();

    // 定义过滤器,基于级别过滤跟踪(trace, debug, info, warn, error)
    let level_filter_layer = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"));

    // 添加我们自定义配置的跟踪器
    let tracing_layer = tracing_opentelemetry::layer().with_tracer(tracer);

    // 用于打印到标准输出
    let formatting_layer = BunyanFormattingLayer::new(
        SERVICE_NAME.to_string(),
        std::io::stdout,
    );

    global::set_text_map_propagator(TraceContextPropagator::new());

    subscriber
        .with(level_filter_layer)
        .with(tracing_layer)
        .with(JsonStorageLayer)
        .with(formatting_layer)
        .init()
}
接下来,让我们实现API端点,同时向处理程序函数添加一些检测。重要的是要注意,我们的重点不是这个应用程序做什么,而是从应用程序生成有意义的和可利用的跟踪数据。

首先,我们创建一个src/models.rs文件,用于序列化和反序列化帖子和评论:
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Post {
    pub user_id: i64,
    pub id: i64,
    pub title: String,
    pub body: String,
    #[serde(default)]
    pub comments: Vec<Comment>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Comment {
    pub post_id: i64,
    pub id: i64,
    pub name: String,
    pub email: String,
    pub body: String,
}
接下来,让我们实现API端点的处理程序。注意instrument属性装饰了一些函数。这就是我们如何启用函数的跟踪及对它执行后续任务的跟踪。

创建一个src/lib.rs文件:
mod telemetry;
mod models;

use futures::StreamExt;
use rand::seq::SliceRandom;
pub use telemetry::init_telemetry;
pub use models::{Post, Comment};

use anyhow::anyhow;
use reqwest::{Client, StatusCode};
use serde::{de::DeserializeOwned};

use actix_web::{get, web, Error, HttpResponse};
use tracing::instrument;

const BASE_API_URL: &'static str = "https://jsonplaceholder.typicode.com";

// web api: /posts
pub fn fetch_posts_service() -> actix_web::Scope {
    web::scope("/posts")
        .service(get_posts)
}

#[instrument(level = "info", name = "get_posts", skip_all)]
#[get("")]
async fn get_posts() -> Result<HttpResponse, Error> {
    // 随机模拟请求处理中的错误
    let choices = [200, 400, 401, 200, 500, 501, 200, 500];
    let mut rng = rand::thread_rng();
    let choice = choices.choose(&mut rng)
        .unwrap()
        .clone();
    match choice {
        400..=401 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
        500..=501 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
        _ => {
            let posts = fetch_posts(20)
                .await
                .map_err(actix_web::error::ErrorInternalServerError)?;
            Ok(HttpResponse::Ok().json(posts))
        }
    }
}

// 获取帖子
#[instrument(level = "info", name = "fetch_posts")]
async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
    let client = Client::new();
    let url = format!("{}/posts", BASE_API_URL);
    let mut posts: Vec<Post> = request_url(&client, &url).await?;
    posts.truncate(limit);
    let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect();

    // 获取帖子评论
    for (index, post_id) in post_idx_to_ids {
        let comments = fetch_comments(&client, post_id).await?;
        posts[index].comments = comments
    }

    Ok(posts)
}

// 获取特定帖子的评论
#[instrument(level = "info", name = "fetch_comments", skip(client))]
async fn fetch_comments(client: &Client, post_id: i64) ->  anyhow::Result<Vec<Comment>> {
    let url = format!("{}/posts/{}/comments", BASE_API_URL, post_id);
    let comments: Vec<Comment> = request_url(&client, &url).await?;
    Ok(comments)
}

// 发送get请求和反序列化json响应
async fn request_url<T: DeserializeOwned>(client: &Client, url: &str) -> anyhow::Result<T> {
    let response = client.get(url)
        .send()
        .await?;
    match response.status() {
        reqwest::StatusCode::OK =>
            response.json::<T>()
            .await
            .map_err(|err| anyhow!(err.to_string()))
        ,
        _ => Err(anyhow!(format!("Request error with statusCode `{}`", response.status()))),
    }
}
在上面的代码中,我们只发送跟踪数据。还可以使用功能强大的日志收集器收集日志并向接收后端中发送。

最后,我们编写src/main.rs文件,用于启动服务:
use actix_web::{App, HttpServer};

use tracing_actix_web::TracingLogger;
use web_api::{init_telemetry, fetch_posts_service};

const EXPORTER_ENDPOINT: &'static str = "http://localhost:7281";

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    init_telemetry(EXPORTER_ENDPOINT);

    HttpServer::new(move || {
        App::new()
            .wrap(TracingLogger::default())
            .service(fetch_posts_service())
    })
    .bind(("127.0.0.1", 9000))?
    .run()
    .await
}
执行cargo run,然后在浏览器中输入:http://127.0.0.1:9000/posts,如图:

监控Rust应用程序
使用Quickwit来提取跟踪数据

我们将创建一个rust-app-tracing/docker-compose.yaml文件来简化Quickwit、Jaeger和Grafana之间的设置。下面的docker-compose文件包含所有必要的配置。
version: '3'
services:
  quickwit:
    image: quickwit/quickwit:latest
    command: run
    restart: always
    environment:
      QW_ENABLE_OTLP_ENDPOINT: true 
      QW_ENABLE_JAEGER_ENDPOINT: true 
    ports:
      - '7280:7280'
      - '7281:7281'
    volumes:
      - ./qwdata:/quickwit/qwdata

  jaeger:
    image: jaegertracing/jaeger-query:latest
    restart: always
    depends_on:
      - quickwit
    environment:
      SPAN_STORAGE_TYPE: 'grpc-plugin'
      GRPC_STORAGE_SERVER: 'quickwit:7281'
    ports:
      - '16686:16686'

  grafana:
    image: grafana/grafana-enterprise:latest
    restart: always
    user: root
    depends_on:
      - quickwit
    environment:
      GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: 'quickwit-quickwit-datasource'
    ports:
      - '3000:3000'
    volumes:
      - ./grafana-storage:/var/lib/grafana 
QW_ENABLE_OTLP_ENDPOINT:允许Quickwit接受和提取跟踪数据和日志数据。
SPAN_STORAGE_TYPE,GRPC_STORAGE_SERVER,QW_ENABLE_JAEGER_ENDPOINT:允许Jaeger从Quickwit提取跟踪数据和日志数据用于分析目的。
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS:允许我们在Grafana中加载特定的插件。

在rust-app-tracing目录下创建用于存储Quickwit数据的qwdata目录。然后,在rust-app-tracing目录下创建grafana-storage/plugins目录,下载并放置Quickwit Grafana数据源插件在这个位置。
wget https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.2.0/quickwit-quickwit-datasource-0.2.0.zip \
&& mkdir -p grafana-storage/plugins \
&& unzip quickwit-quickwit-datasource-0.2.0.zip -d grafana-storage/plugins
现在让我们运行以下命令启动所有服务(Quickwit, Jaeger, Grafana):
docker compose up -d
现在进入web-api目录,运行我们的web应用程序:
cargo run 
执行如下命令:
curl -X GET http://localhost:9000/posts
等待大约10秒,新跟踪将被编入索引并可用于搜索。
现在可以通过CURL搜索otel-traces-v0_6索引来检查Quickwit是否已经索引了跟踪数据。
curl -X POST http://localhost:7280/api/v1/otel-traces-v0_6/search -H 'Content-Type: application/json' -d '{ "query": "service_name:quickwit-jaeger-demo" }'
你也可以使用Quickwit UI查看数据:http://localhost:7280/ui/search,

如图:

使用Jaeger UI分析痕迹
Jaeger容器已经运行,通过 http://localhost:16686 地址就可以可视化的查看应用程序的跟踪数据。

从上面的截图中可以看到,我们依次为每个帖子获取评论。也就是说,我们一个接一个地提出二十个请求,这使得整个请求处理时间更长。我们能做得更好吗?让我们利用Tokio和Rust futures crate的异步流特性来并发地获取评论。

修改fetch_posts函数,以便以批处理的方式并发地运行请求,每次处理10个请求。这应该能加快一点速度。
// 获取帖子
#[instrument(level = "info", name = "fetch_posts")]
async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
    let client = Client::new();
    let url = format!("{}/posts", BASE_API_URL);
    let mut posts: Vec<Post> = request_url(&client, &url).await?;
    posts.truncate(limit);
    let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect();

    // 获取帖子评论
    // for (index, post_id) in post_idx_to_ids {
    //     let comments = fetch_comments(&client, post_id).await?;
    //     posts[index].comments = comments
    // }

    // 并发获取帖子评论
    let tasks: Vec<_> = post_idx_to_ids
        .into_iter()
        .map(|(index, post_id)| {
            let moved_client = client.clone();
            async move {
                let comments_fetch_result = fetch_comments(&moved_client, post_id).await;
                (index, comments_fetch_result)
            }
        })
        .collect();
    let mut stream = futures::stream::iter(tasks)
        .buffer_unordered(10);
    while let Some((index, comments_fetch_result)) = stream.next().await {
        let comments = comments_fetch_result?;
        posts[index].comments = comments;
    }

    Ok(posts)
}
重新运行程序,再次查看,发现时间缩短了。

使用Grafana监控应用程序指标
Jaeger适合集中检查单个痕迹。但是,如果我们想要监视服务的延迟,该怎么办呢?如果我们想用给定的跟踪元数据计算错误或请求的数量,该怎么办?
这就是Grafana仪表板发挥作用的地方,我们希望从追踪的数据中构建RED指标,并在Grafana中可视化它们。

在浏览器中输入 http://localhost:3000/login,使用admin作为用户名和密码登录。登录后,我们可以连接到Quickwit,并使用新发布的Quickwit数据源插件查询我们的应用程序跟踪数据。

总结
在这篇文章中,我们深入探讨了如何实现Rust应用程序的分布式跟踪以及如何使用它来监视应用程序的性能。
用户评论