builder.Services.AddSingleton<Channel<VectorData>>(_ => { //return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions //{ // SingleReader = true, // AllowSynchronousContinuations = false, //}); return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait, AllowSynchronousContinuations = false, }); });这里我们有两种选择:
AllowSynchronousContinuations:是否允许同步执行回调。如果设为 false,所有等待的任务都会被异步调度,避免线程被阻塞。
.任务执行时间较短,不容易积累大量待处理数据。
.需要对通道满时的行为进行控制,例如优先保留新数据,或让生产者等待。
特性 | UnboundedChannelOptions | BoundedChannelOptions |
---|---|---|
是否限制容量 | ❌ 无限制 | ✅ 受限(如 10 条消息) |
适用于高吞吐 | ✅ 适合 | 🚫 可能会影响 |
可能导致内存占满 | ✅ 可能 | 🚫 不太可能 |
生产者快于消费者时 | ❌ 可能导致 OOM | ✅ 可以限制 |
配置通道满时行为 | 🚫 无法配置 | ✅ 可配置 FullMode |
适用场景 | 生产者、消费者速度匹配 | 生产者速度可能远超消费者 |
2.如果你确定不会有大量数据积压,或者消费能力能跟上,UnboundedChannelOptions 也可以。
builder.Services.AddSingleton<Channel<VectorData>>(_ => { return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions { SingleReader = true, AllowSynchronousContinuations = false, }); });
这里就没有了 BoundedChannelOptions 里的 FullMode,因为它本身不会满。但是如果生产者不断写入,消费者来不及消费,可能会导致内存占用越来越大。
using System.Threading.Channels; var builder = WebApplication.CreateBuilder(args); builder.Services.AddHostedService<VectorService>(); builder.Services.AddSingleton<Channel<VectorData>>(_ => { //return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions //{ // SingleReader = true, // AllowSynchronousContinuations = false, //}); return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10) { // 堆代码 duidaima.com SingleReader = true, FullMode = BoundedChannelFullMode.Wait, AllowSynchronousContinuations = false, }); }); var app = builder.Build(); app.MapGet("/vector", async (Channel<VectorData> channel) => { await channel.Writer.WriteAsync(new VectorData($"这里是用户内容类信息,{DateTime.Now.ToString("yyyyMMddHHmmssfff")}")); return Results.Ok(); }); app.Run(); public record VectorData(string content); public class VectorService : BackgroundService { private readonly Channel<VectorData> _channel; public VectorService(Channel<VectorData> channel) { _channel = channel; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (await _channel.Reader.WaitToReadAsync(stoppingToken)) { var vectorData = await _channel.Reader.ReadAsync(stoppingToken); Console.WriteLine($"开始向量化处理:{vectorData.content}"); await Task.Delay(3000, stoppingToken); Console.WriteLine($"向量化处理结束"); } } }