闽公网安备 35020302035485号
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
//{
// SingleReader = true,
// AllowSynchronousContinuations = false,
//});
return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10)
{
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
});
这里我们有两种选择:AllowSynchronousContinuations:是否允许同步执行回调。如果设为 false,所有等待的任务都会被异步调度,避免线程被阻塞。
.任务执行时间较短,不容易积累大量待处理数据。
.需要对通道满时的行为进行控制,例如优先保留新数据,或让生产者等待。
| 特性 | UnboundedChannelOptions | BoundedChannelOptions |
|---|---|---|
| 是否限制容量 | ❌ 无限制 | ✅ 受限(如 10 条消息) |
| 适用于高吞吐 | ✅ 适合 | 🚫 可能会影响 |
| 可能导致内存占满 | ✅ 可能 | 🚫 不太可能 |
| 生产者快于消费者时 | ❌ 可能导致 OOM | ✅ 可以限制 |
| 配置通道满时行为 | 🚫 无法配置 | ✅ 可配置 FullMode |
| 适用场景 | 生产者、消费者速度匹配 | 生产者速度可能远超消费者 |
2.如果你确定不会有大量数据积压,或者消费能力能跟上,UnboundedChannelOptions 也可以。
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
{
SingleReader = true,
AllowSynchronousContinuations = false,
});
});
这里就没有了 BoundedChannelOptions 里的 FullMode,因为它本身不会满。但是如果生产者不断写入,消费者来不及消费,可能会导致内存占用越来越大。
using System.Threading.Channels;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<VectorService>();
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{
//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions
//{
// SingleReader = true,
// AllowSynchronousContinuations = false,
//});
return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10)
{
// 堆代码 duidaima.com
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
});
var app = builder.Build();
app.MapGet("/vector", async (Channel<VectorData> channel) =>
{
await channel.Writer.WriteAsync(new VectorData($"这里是用户内容类信息,{DateTime.Now.ToString("yyyyMMddHHmmssfff")}"));
return Results.Ok();
});
app.Run();
public record VectorData(string content);
public class VectorService : BackgroundService
{
private readonly Channel<VectorData> _channel;
public VectorService(Channel<VectorData> channel)
{
_channel = channel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
var vectorData = await _channel.Reader.ReadAsync(stoppingToken);
Console.WriteLine($"开始向量化处理:{vectorData.content}");
await Task.Delay(3000, stoppingToken);
Console.WriteLine($"向量化处理结束");
}
}
}