闽公网安备 35020302035485号
3. docker-compose
构建和测试Rust应用程序
cargo new web-api在Cargo.toml文件中加入以下依赖项:
[dependencies]
actix-web = "4.1.0"
actix-web-opentelemetry = "0.13.0"
anyhow = "1.0.71"
futures = "0.3.28"
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["reqwest-client", "reqwest-rustls", "http-proto"] }
opentelemetry-semantic-conventions = "0.11.0"
reqwest = {version = "0.11.18", features = ["json"] }
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
tokio = { version = "1.24", features = ["full"] }
tracing = "0.1.36"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8.5"
log = "0.4.17"
tracing-actix-web = "0.7"
actix-web-opentelemetry:actix-web框架的open-telemetry扩展。use opentelemetry::{global, KeyValue};
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::{trace, Resource};
use opentelemetry_otlp::WithExportConfig;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::Registry;
use tracing_subscriber::{prelude::*, EnvFilter};
const SERVICE_NAME: &'static str = "quickwit-jaeger-demo";
pub fn init_telemetry(exporter_endpoint: &str) {
// 堆代码 duidaima.com
// 创建 gRPC 输出
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(exporter_endpoint);
// 定义跟踪器
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(
trace::config().with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
SERVICE_NAME.to_string(),
)])),
)
.install_batch(opentelemetry::runtime::Tokio)
.expect("Error: Failed to initialize the tracer.");
// 定义订阅者
let subscriber = Registry::default();
// 定义过滤器,基于级别过滤跟踪(trace, debug, info, warn, error)
let level_filter_layer = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"));
// 添加我们自定义配置的跟踪器
let tracing_layer = tracing_opentelemetry::layer().with_tracer(tracer);
// 用于打印到标准输出
let formatting_layer = BunyanFormattingLayer::new(
SERVICE_NAME.to_string(),
std::io::stdout,
);
global::set_text_map_propagator(TraceContextPropagator::new());
subscriber
.with(level_filter_layer)
.with(tracing_layer)
.with(JsonStorageLayer)
.with(formatting_layer)
.init()
}
接下来,让我们实现API端点,同时向处理程序函数添加一些检测。重要的是要注意,我们的重点不是这个应用程序做什么,而是从应用程序生成有意义的和可利用的跟踪数据。use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Post {
pub user_id: i64,
pub id: i64,
pub title: String,
pub body: String,
#[serde(default)]
pub comments: Vec<Comment>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Comment {
pub post_id: i64,
pub id: i64,
pub name: String,
pub email: String,
pub body: String,
}
接下来,让我们实现API端点的处理程序。注意instrument属性装饰了一些函数。这就是我们如何启用函数的跟踪及对它执行后续任务的跟踪。mod telemetry;
mod models;
use futures::StreamExt;
use rand::seq::SliceRandom;
pub use telemetry::init_telemetry;
pub use models::{Post, Comment};
use anyhow::anyhow;
use reqwest::{Client, StatusCode};
use serde::{de::DeserializeOwned};
use actix_web::{get, web, Error, HttpResponse};
use tracing::instrument;
const BASE_API_URL: &'static str = "https://jsonplaceholder.typicode.com";
// web api: /posts
pub fn fetch_posts_service() -> actix_web::Scope {
web::scope("/posts")
.service(get_posts)
}
#[instrument(level = "info", name = "get_posts", skip_all)]
#[get("")]
async fn get_posts() -> Result<HttpResponse, Error> {
// 随机模拟请求处理中的错误
let choices = [200, 400, 401, 200, 500, 501, 200, 500];
let mut rng = rand::thread_rng();
let choice = choices.choose(&mut rng)
.unwrap()
.clone();
match choice {
400..=401 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
500..=501 => Ok(HttpResponse::new(StatusCode::from_u16(choice).unwrap())),
_ => {
let posts = fetch_posts(20)
.await
.map_err(actix_web::error::ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(posts))
}
}
}
// 获取帖子
#[instrument(level = "info", name = "fetch_posts")]
async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
let client = Client::new();
let url = format!("{}/posts", BASE_API_URL);
let mut posts: Vec<Post> = request_url(&client, &url).await?;
posts.truncate(limit);
let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect();
// 获取帖子评论
for (index, post_id) in post_idx_to_ids {
let comments = fetch_comments(&client, post_id).await?;
posts[index].comments = comments
}
Ok(posts)
}
// 获取特定帖子的评论
#[instrument(level = "info", name = "fetch_comments", skip(client))]
async fn fetch_comments(client: &Client, post_id: i64) -> anyhow::Result<Vec<Comment>> {
let url = format!("{}/posts/{}/comments", BASE_API_URL, post_id);
let comments: Vec<Comment> = request_url(&client, &url).await?;
Ok(comments)
}
// 发送get请求和反序列化json响应
async fn request_url<T: DeserializeOwned>(client: &Client, url: &str) -> anyhow::Result<T> {
let response = client.get(url)
.send()
.await?;
match response.status() {
reqwest::StatusCode::OK =>
response.json::<T>()
.await
.map_err(|err| anyhow!(err.to_string()))
,
_ => Err(anyhow!(format!("Request error with statusCode `{}`", response.status()))),
}
}
在上面的代码中,我们只发送跟踪数据。还可以使用功能强大的日志收集器收集日志并向接收后端中发送。use actix_web::{App, HttpServer};
use tracing_actix_web::TracingLogger;
use web_api::{init_telemetry, fetch_posts_service};
const EXPORTER_ENDPOINT: &'static str = "http://localhost:7281";
#[actix_web::main]
async fn main() -> std::io::Result<()> {
init_telemetry(EXPORTER_ENDPOINT);
HttpServer::new(move || {
App::new()
.wrap(TracingLogger::default())
.service(fetch_posts_service())
})
.bind(("127.0.0.1", 9000))?
.run()
.await
}
执行cargo run,然后在浏览器中输入:http://127.0.0.1:9000/posts,如图:
version: '3'
services:
quickwit:
image: quickwit/quickwit:latest
command: run
restart: always
environment:
QW_ENABLE_OTLP_ENDPOINT: true
QW_ENABLE_JAEGER_ENDPOINT: true
ports:
- '7280:7280'
- '7281:7281'
volumes:
- ./qwdata:/quickwit/qwdata
jaeger:
image: jaegertracing/jaeger-query:latest
restart: always
depends_on:
- quickwit
environment:
SPAN_STORAGE_TYPE: 'grpc-plugin'
GRPC_STORAGE_SERVER: 'quickwit:7281'
ports:
- '16686:16686'
grafana:
image: grafana/grafana-enterprise:latest
restart: always
user: root
depends_on:
- quickwit
environment:
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: 'quickwit-quickwit-datasource'
ports:
- '3000:3000'
volumes:
- ./grafana-storage:/var/lib/grafana
QW_ENABLE_OTLP_ENDPOINT:允许Quickwit接受和提取跟踪数据和日志数据。wget https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.2.0/quickwit-quickwit-datasource-0.2.0.zip \ && mkdir -p grafana-storage/plugins \ && unzip quickwit-quickwit-datasource-0.2.0.zip -d grafana-storage/plugins现在让我们运行以下命令启动所有服务(Quickwit, Jaeger, Grafana):
docker compose up -d现在进入web-api目录,运行我们的web应用程序:
cargo run执行如下命令:
curl -X GET http://localhost:9000/posts等待大约10秒,新跟踪将被编入索引并可用于搜索。
curl -X POST http://localhost:7280/api/v1/otel-traces-v0_6/search -H 'Content-Type: application/json' -d '{ "query": "service_name:quickwit-jaeger-demo" }'
你也可以使用Quickwit UI查看数据:http://localhost:7280/ui/search,

// 获取帖子
#[instrument(level = "info", name = "fetch_posts")]
async fn fetch_posts(limit: usize) -> anyhow::Result<Vec<Post>> {
let client = Client::new();
let url = format!("{}/posts", BASE_API_URL);
let mut posts: Vec<Post> = request_url(&client, &url).await?;
posts.truncate(limit);
let post_idx_to_ids: Vec<(usize, i64)> = posts.iter().enumerate().map(|(idx, post)| (idx, post.id)).collect();
// 获取帖子评论
// for (index, post_id) in post_idx_to_ids {
// let comments = fetch_comments(&client, post_id).await?;
// posts[index].comments = comments
// }
// 并发获取帖子评论
let tasks: Vec<_> = post_idx_to_ids
.into_iter()
.map(|(index, post_id)| {
let moved_client = client.clone();
async move {
let comments_fetch_result = fetch_comments(&moved_client, post_id).await;
(index, comments_fetch_result)
}
})
.collect();
let mut stream = futures::stream::iter(tasks)
.buffer_unordered(10);
while let Some((index, comments_fetch_result)) = stream.next().await {
let comments = comments_fetch_result?;
posts[index].comments = comments;
}
Ok(posts)
}
重新运行程序,再次查看,发现时间缩短了。