3. docker-compose
构建和测试Rust应用程序
cargo new web-api在Cargo.toml文件中加入以下依赖项:
[dependencies] actix-web = "4.1.0" actix-web-opentelemetry = "0.13.0" anyhow = "1.0.71" futures = "0.3.28" opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] } opentelemetry-otlp = { version = "0.12.0", features = ["reqwest-client", "reqwest-rustls", "http-proto"] } opentelemetry-semantic-conventions = "0.11.0" reqwest = {version = "0.11.18", features = ["json"] } serde = { version = "1.0.143", features = ["derive"] } serde_json = "1.0.83" tokio = { version = "1.24", features = ["full"] } tracing = "0.1.36" tracing-bunyan-formatter = "0.3.3" tracing-opentelemetry = "0.19.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } rand = "0.8.5" log = "0.4.17" tracing-actix-web = "0.7"actix-web-opentelemetry:actix-web框架的open-telemetry扩展。
use opentelemetry::{global, KeyValue}; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::{trace, Resource}; use opentelemetry_otlp::WithExportConfig; use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; use tracing_subscriber::Registry; use tracing_subscriber::{prelude::*, EnvFilter}; const SERVICE_NAME: &'static str = "quickwit-jaeger-demo"; pub fn init_telemetry(exporter_endpoint: &str) { // 堆代码 duidaima.com // 创建 gRPC 输出 let exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(exporter_endpoint); // 定义跟踪器 let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(exporter) .with_trace_config( trace::config().with_resource(Resource::new(vec![KeyValue::new( opentelemetry_semantic_conventions::resource::SERVICE_NAME, SERVICE_NAME.to_string(), )])), ) .install_batch(opentelemetry::runtime::Tokio) .expect("Error: Failed to initialize the tracer."); // 定义订阅者 let subscriber = Registry::default(); // 定义过滤器,基于级别过滤跟踪(trace, debug, info, warn, error) let level_filter_layer = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); // 添加我们自定义配置的跟踪器 let tracing_layer = tracing_opentelemetry::layer().with_tracer(tracer); // 用于打印到标准输出 let formatting_layer = BunyanFormattingLayer::new( SERVICE_NAME.to_string(), std::io::stdout, ); global::set_text_map_propagator(TraceContextPropagator::new()); subscriber .with(level_filter_layer) .with(tracing_layer) .with(JsonStorageLayer) .with(formatting_layer) .init() }接下来,让我们实现API端点,同时向处理程序函数添加一些检测。重要的是要注意,我们的重点不是这个应用程序做什么,而是从应用程序生成有意义的和可利用的跟踪数据。
use serde::{Serialize, Deserialize}; #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Post { pub user_id: i64, pub id: i64, pub title: String, pub body: String, #[serde(default)] pub comments: Vec<Comment>, } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Comment { pub post_id: i64, pub id: i64, pub name: String, pub email: String, pub body: String, }接下来,让我们实现API端点的处理程序。注意instrument属性装饰了一些函数。这就是我们如何启用函数的跟踪及对它执行后续任务的跟踪。
mod telemetry; mod models; use futures::StreamExt; use rand::seq::SliceRandom; pub use telemetry::init_telemetry; pub use models::{Post, Comment}; use anyhow::anyhow; use reqwest::{Client, StatusCode}; use serde::{de::DeserializeOwned}; use actix_web::{get, web, Error, HttpResponse}; use tracing::instrument; const BASE_API_URL: &'static str = "https://jsonplaceholder.typicode.com"; // web api: /posts pub fn fetch_posts_service() -> actix_web::Scope { web::scope("/posts") .service(get_posts) } #[instrument(level = "info", name = "get_posts", skip_all)] #[get("")] async fn get_posts() -> Result<HttpResponse, Error> { // 随机模拟请求处理中的错误 let choices = [200, 400, 401, 200, 500, 501, 200, 500]; let mut rng = rand::thread_rng(); let choice = choices.choose(&mut rng) .unwrap() .clone(); match choice { 400..=401 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())), 500..=501 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())), _ => { let posts = fetch_posts(20) .await .map_err(actix_web::error::ErrorInternalServerError)?; Ok(HttpResponse::Ok().json(posts)) } } } // 获取帖子 #[instrument(level = "info", name = "fetch_posts")] async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> { let client = Client::new(); let url = format!("{}/posts", BASE_API_URL); let mut posts: Vec<Post> = request_url(&client, &url).await?; posts.truncate(limit); let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect(); // 获取帖子评论 for (index, post_id) in post_idx_to_ids { let comments = fetch_comments(&client, post_id).await?; posts[index].comments = comments } Ok(posts) } // 获取特定帖子的评论 #[instrument(level = "info", name = "fetch_comments", skip(client))] async fn fetch_comments(client: &Client, post_id: i64) -> anyhow::Result<Vec<Comment>> { let url = format!("{}/posts/{}/comments", BASE_API_URL, post_id); let comments: Vec<Comment> = request_url(&client, &url).await?; Ok(comments) } // 发送get请求和反序列化json响应 async fn request_url<T: DeserializeOwned>(client: &Client, url: &str) -> anyhow::Result<T> { let response = client.get(url) .send() .await?; match response.status() { reqwest::StatusCode::OK => response.json::<T>() .await .map_err(|err| anyhow!(err.to_string())) , _ => Err(anyhow!(format!("Request error with statusCode `{}`", response.status()))), } }在上面的代码中,我们只发送跟踪数据。还可以使用功能强大的日志收集器收集日志并向接收后端中发送。
use actix_web::{App, HttpServer}; use tracing_actix_web::TracingLogger; use web_api::{init_telemetry, fetch_posts_service}; const EXPORTER_ENDPOINT: &'static str = "http://localhost:7281"; #[actix_web::main] async fn main() -> std::io::Result<()> { init_telemetry(EXPORTER_ENDPOINT); HttpServer::new(move || { App::new() .wrap(TracingLogger::default()) .service(fetch_posts_service()) }) .bind(("127.0.0.1", 9000))? .run() .await }执行cargo run,然后在浏览器中输入:http://127.0.0.1:9000/posts,如图:
version: '3' services: quickwit: image: quickwit/quickwit:latest command: run restart: always environment: QW_ENABLE_OTLP_ENDPOINT: true QW_ENABLE_JAEGER_ENDPOINT: true ports: - '7280:7280' - '7281:7281' volumes: - ./qwdata:/quickwit/qwdata jaeger: image: jaegertracing/jaeger-query:latest restart: always depends_on: - quickwit environment: SPAN_STORAGE_TYPE: 'grpc-plugin' GRPC_STORAGE_SERVER: 'quickwit:7281' ports: - '16686:16686' grafana: image: grafana/grafana-enterprise:latest restart: always user: root depends_on: - quickwit environment: GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: 'quickwit-quickwit-datasource' ports: - '3000:3000' volumes: - ./grafana-storage:/var/lib/grafanaQW_ENABLE_OTLP_ENDPOINT:允许Quickwit接受和提取跟踪数据和日志数据。
wget https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.2.0/quickwit-quickwit-datasource-0.2.0.zip \ && mkdir -p grafana-storage/plugins \ && unzip quickwit-quickwit-datasource-0.2.0.zip -d grafana-storage/plugins现在让我们运行以下命令启动所有服务(Quickwit, Jaeger, Grafana):
docker compose up -d现在进入web-api目录,运行我们的web应用程序:
cargo run执行如下命令:
curl -X GET http://localhost:9000/posts等待大约10秒,新跟踪将被编入索引并可用于搜索。
curl -X POST http://localhost:7280/api/v1/otel-traces-v0_6/search -H 'Content-Type: application/json' -d '{ "query": "service_name:quickwit-jaeger-demo" }'你也可以使用Quickwit UI查看数据:http://localhost:7280/ui/search,
// 获取帖子 #[instrument(level = "info", name = "fetch_posts")] async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> { let client = Client::new(); let url = format!("{}/posts", BASE_API_URL); let mut posts: Vec<Post> = request_url(&client, &url).await?; posts.truncate(limit); let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect(); // 获取帖子评论 // for (index, post_id) in post_idx_to_ids { // let comments = fetch_comments(&client, post_id).await?; // posts[index].comments = comments // } // 并发获取帖子评论 let tasks: Vec<_> = post_idx_to_ids .into_iter() .map(|(index, post_id)| { let moved_client = client.clone(); async move { let comments_fetch_result = fetch_comments(&moved_client, post_id).await; (index, comments_fetch_result) } }) .collect(); let mut stream = futures::stream::iter(tasks) .buffer_unordered(10); while let Some((index, comments_fetch_result)) = stream.next().await { let comments = comments_fetch_result?; posts[index].comments = comments; } Ok(posts) }重新运行程序,再次查看,发现时间缩短了。