• .NET Core中使用Polly实现熔断机制功能
  • 发布于 2个月前
  • 222 热度
    0 评论
概念解析
啥是熔断
而对于微服务来说,熔断就是我们常说的“保险丝”,意为当服务出现某些状况时,切断服务,从而防止应用程序不断地尝试执行可能会失败的操作造成系统的“雪崩”;或者大量的超时等待导致系统卡死等情况,很多地方也将其成为“过载保护”。

一个典型的应用场景:
双11”高峰部分消费者反应阿里系手机App发生宕机,这个报错的本质就是服务端流量过大,直接拒绝了部分请求;也就是“熔断”了,像保险丝一样。

啥是降级
降级的目的就是当某个服务提供者发生故障的时候,启用的一套备用的逻辑;通常有两种比较典型的做法:
1、是直接向调用方返回一个错误响应或者错误页面;
2、是执行备用/替代逻辑;

1比较容易理解;2的话,举个例子你有个发送短信的服务非常重要,但你只接入了阿里云短信服务,要是某天阿里云挂了你怎么办?那我再接入个便宜点腾讯云短信。没错这就是服务降级/回退;

可以看到降级主要做的是用户体验上的考虑,避免服务报错时直接UI/js报错卡住,点击没反应 等等功能/体验降级;

如何实现
根据前面的概念,我们知道服务熔断其实比较好做;服务的降级是一个备用的逻辑,如果每个功能都实现一套备用逻辑成本是非常高的(要写两套代码);所以服务降级我们比较常见到的是返回一个错误;

前端
1、写好请求拦截器,遇到各种后端未约定好的状态码;返回数据的格式;做到有对应的处理逻辑,该toast的toast;
2、404、500等错误页面要准备好,不能无端端空白页;特别是不能报后端的堆栈信息出来;
3、做好全局异常处理,最好配合做好异常埋点,做到故障有迹可循;故障体验也要考虑,避免js报错页面操作直接没反应;
后端
Net WebApi
1、写好异常过滤器(实现IExceptionFilter),不要直接响应500或抛堆栈信息到前端;
示例:略
2、处理好模型验证信息;

示例:
public static IServiceCollection ConfigureApiBehaviorOptions(this IServiceCollection services)
{
    services.Configure<ApiBehaviorOptions>(options =>
    {
        options.InvalidModelStateResponseFactory = (actionContext) =>
        {
           // 堆代码 duidaima.com
            var firstInvalidMsg = actionContext.ModelState?.Values.SelectMany(c => c.Errors).Select(c => c.ErrorMessage)?.FirstOrDefault();

            return new JsonResult(new ApiResult<object>()
            {
                Code = EnumStatus.Fail,
                Message = firstInvalidMsg ?? "参数验证失败"
            });
        };
    });
    return services;
}

第三方库Polly实现
Polly 是一个 .NET 弹性和瞬态故障处理库,允许开发人员以 Fluent 和线程安全的方式来实现重试、断路、超时、隔离、舱壁隔离、频率限制和回退策略。

首先这里的说的瞬态故障包含了程序发生的异常和出现不符合开发者预期的结果。所谓瞬态故障,就是说故障不是必然会发生的,而是偶然可能会发生的,比如网络偶尔会突然出现不稳定或无法访问这种故障。至于弹性,就是指应对故障 Polly 的处理策略具有多样性和灵活性,它的各种策略可以灵活地定义和组合。

抽象,重试、断路、超时、隔离、舱壁隔离、频率限制就是Polly的策略,我们一一介绍下;

先安装nuget
Install-Package Polly
项目地址:https://github.com/App-vNext/Polly

介绍
Polly 的异常处理策略的基本用法可以分为三个步骤
   Policy
        // 1. 指定要处理什么异常
        .Handle<HttpRequestException>()
        // 或者指定需要处理什么样的错误返回
        .OrResult<HttpResponseMessage>(r => r.StatusCode == HttpStatusCode.BadGateway)
        // 2. 指定重试次数和重试策略
        .Retry(3, (exception, retryCount, context) =>
        {
            Console.WriteLine($"开始第 {retryCount} 次重试:");
        })
        // 3. 执行具体任务
        .Execute(ExecuteMockRequest);
重试(Retry)
当我们服务依赖外部接口时,往往有接口瞬间故障问题,这个时刻就可以考虑重试策略;

// 重试一次
Policy
  .Handle<SomeExceptionType>()
  .Retry()

// 重试3次
Policy
  .Handle<SomeExceptionType>()
  .Retry(3)

// 重试3次,并在重试时执行逻辑
Policy
  .Handle<SomeExceptionType>()
  .Retry(3, onRetry: (exception, retryCount) =>
  {
      // Add log
  });

// 重试3次,并在重试时执行逻辑时携带上下文
Policy
  .Handle<SomeExceptionType>()
  .Retry(3, onRetry: (exception, retryCount, context) =>
  {
      // 每次重试时执行逻辑,比如写日志
  });
一直重试

// 一直重试
Policy
  .Handle<SomeExceptionType>()
  .RetryForever()

// 一直重试,同时执行逻辑
Policy
  .Handle<SomeExceptionType>()
  .RetryForever(onRetry: exception =>
  {
        // Add logic
  });

// 一直重试,同时执行逻辑(参数不一样)
Policy
  .Handle<SomeExceptionType>()
  .RetryForever(onRetry: (exception, context) =>
  {
        // Add logic 
  });
更多...
超时(TimeOut)
当系统超过一定时间的等待,我们就几乎可以判断不可能会有成功的结果。比如平时一个网络请求瞬间就完成了,如果有一次网络请求超过了 30 秒还没完成,我们就知道这次大概率是不会返回成功的结果了。因此,我们需要设置系统的超时时间,避免系统无限等待。
// 执行30秒后超时
Policy
  .Timeout(30)

//  timespan做超时时间.
Policy
  .Timeout(TimeSpan.FromMilliseconds(2500))

// 动态超时时间
Policy
  .Timeout(() => myTimeoutProvider)) // Func<TimeSpan> myTimeoutProvider

// 超时时,执行特定的逻辑
Policy
  .Timeout(30, onTimeout: (context, timespan, task) =>
    {
        // Add extra logic to be invoked when a timeout occurs, such as logging
    });

//示例: 超时时,记录存在状态
Policy
  .Timeout(30, onTimeout: (context, timespan, task) =>
    {
        logger.Warn($"{context.PolicyKey} at {context.OperationKey}: execution timed out after {timespan.TotalSeconds} seconds.");
    });

//  示例:当超时的任务完成时,捕获来自超时任务的异常。
Policy
  .Timeout(30, onTimeout: (context, timespan, task) =>
    {
        task.ContinueWith(t => {
            if (t.IsFaulted) logger.Error($"{context.PolicyKey} at {context.OperationKey}: execution timed out after {timespan.TotalSeconds} seconds, with: {t.Exception}.");
        }); 
    });
更多...
回退(Fallback)
当出现故障,则进入降级动作。很常见的一个场景是,当用户没有上传头像时,我们就给他一个默认头像。
// 当没有用户头像时,用默认头像
Policy
   .Handle<UserAvatar>()
   .OrResult(null)
   .Fallback<UserAvatar>(() => UserAvatar.GetDefaultAvatar())

// 当然,出现异常的时候也可以回退
Policy<UserAvatar>
   .Handle<FooException>()
   .OrResult(null)
   .Fallback<UserAvatar>(() => UserAvatar.GetDefaultAvatar()) 

// 执行回退策略的时候,执行逻辑(比如写个日志)
Policy<UserAvatar>
   .Handle<FooException>()
   .Fallback<UserAvatar>(UserAvatar.Blank, onFallback: (exception, context) =>
    {
        // Add logging
    });
断路(Circuit-breaker)
我们服务也会依赖外部接口,有的时候外部接口负载很高的时候,响应很慢的时候。可以考虑使用断路器,阻断一定时间内对这个外部接口的调用逻辑;减轻第三方接口压力,起短路器的作用;
//出现某个异常两次时,断路一分钟
Policy
    .Handle<SomeExceptionType>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1));

//出现某个异常两次时,断路一分钟;
//当触发断路,断路恢复时,执行对应的逻辑;
Action<Exception, TimeSpan> onBreak = (exception, timespan) => { ... }; //断路逻辑
Action onReset = () => { ... }; //断路恢复逻辑
CircuitBreakerPolicy breaker = Policy
    .Handle<SomeExceptionType>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);

//出现某个异常两次时,断路一分钟;
//当触发断路,断路恢复时,携带上下文 执行对应的逻辑;
Action<Exception, TimeSpan, Context> onBreak = (exception, timespan, context) => { ... };
Action<Context> onReset = context => { ... };
CircuitBreakerPolicy breaker = Policy
    .Handle<SomeExceptionType>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);

//获取回路状态
CircuitState state = breaker.CircuitState;

/*
*断路器状态释义
CircuitState.Closed - 正常状态,可以执行动作;
CircuitState.Open - 启动断路器,业务逻辑动作的执行被阻止.
CircuitState.HalfOpen - 当开启状态过期后,逻辑动作已经可以执行。这个时候接下来的状态将会根据动作的执行为开启或关闭;
CircuitState.Isolated - 断路器被独立地设置为开启状态,并保持开启.,业务逻辑动作的执行被阻止.

//手动开启一个断路器,并保证开启状态;比如手动隔离下游服务
breaker.Isolate();
//重置断路器到closed状态,以便再次执行动作
breaker.Reset();
更多...
频率限制(Rate-Limit)
限制一段代码的执行频率;
//每秒钟执行不能超过20次
Policy.RateLimit(20, TimeSpan.FromSeconds(1));

// 每秒钟执行不能超过20次,且不能连续执行超过10次
Policy.RateLimit(20, TimeSpan.FromSeconds(1), 10);

// 每秒钟执行不能超过20次,如果超过之后执行一段逻辑,并设置下次重试时间
Policy.RateLimit(20, TimeSpan.FromSeconds(1), (retryAfter, context) =>
{
    return retryAfter.Add(TimeSpan.FromSeconds(2));
});

// 每秒钟执行不能超过20次,且不能连续执行超过10次,如果超过之后执行一段逻辑,并设置下次重试时间
Policy.RateLimit(20, TimeSpan.FromSeconds(1), 10, (retryAfter, context) =>
{
    return retryAfter.Add(TimeSpan.FromSeconds(2));
});
更多...
舱壁隔离(Bulkhead Isolation)
当系统的一处出现故障时,可能促发多个失败的调用,很容易耗尽主机的资源(如 CPU)。下游系统出现故障可能导致上游的故障的调用,甚至可能蔓延到导致系统崩溃。

所以要将可控的操作限制在一个固定大小的资源池中,以隔离有潜在可能相互影响的操作。
// 最多允许 12 个线程并发执行
Policy
  .Bulkhead(12)

// 最多允许 12 个线程并发执行
// 如果所有的线程都被占用后,有两个等待执行槽
Policy
  .Bulkhead(12, 2)

// 限制并发后,调用一个委托
Policy
  .Bulkhead(12, context =>
    {
        // 比如记日志
    });

//监控隔离仓的可用资源
var bulkhead = Policy.Bulkhead(12, 2);
// ...
int freeExecutionSlots = bulkhead.BulkheadAvailableCount;
int freeQueueSlots     = bulkhead.QueueAvailableCount;
更多...
缓存(Cache)
一般我们会把频繁使用且不会怎么变化的资源缓存起来,以提高系统的响应速度。如果不对缓存资源的调用进行封装,那么我们调用的时候就要先判断缓存中有没有这个资源,有的话就从缓存返回,否则就从资源存储的地方(比如数据库)获取后缓存起来,再返回,而且有时还要考虑缓存过期和如何更新缓存的问题。Polly 提供了缓存策略的支持,使得问题变得简单。
var memoryCache = new MemoryCache(new MemoryCacheOptions());
var memoryCacheProvider = new MemoryCacheProvider(memoryCache);
var cachePolicy = Policy.Cache(memoryCacheProvider, TimeSpan.FromMinutes(5));
定义一个绝对缓存时间一天的缓存
var cachePolicy = Policy.Cache(memoryCacheProvider, new AbsoluteTtl(DateTimeOffset.Now.Date.AddDays(1));
一个滑动缓存时间5分钟的缓存                               
var cachePolicy = Policy.Cache(memoryCacheProvider, new SlidingTtl(TimeSpan.FromMinutes(5));
定义一个缓存提供程序报错后可以记录日志或执行逻辑的缓存
var cachePolicy = Policy.Cache(myCacheProvider, TimeSpan.FromMinutes(5),
   (context, key, ex) => {
       logger.Error($"Cache provider, for key {key}, threw exception: {ex}."); // (for example)
   }
);

// Execute through the cache as a read-through cache: check the cache first; if not found, execute underlying delegate and store the result in the cache.
// The key to use for caching, for a particular execution, is specified by setting the OperationKey (before v6: ExecutionKey) on a Context instance passed to the execution. Use an overload of the form shown below (or a richer overload including the same elements).
// Example: "FooKey" is the cache key that will be used in the below execution.
缓存到FooKey的key里面,你
TResult result = cachePolicy.Execute(context => getFoo(), new Context("FooKey"));
更多...

AspectCore + Polly 的AOP实现
从上面来看,我们在代码里面使用Polly会产生很多重复代码,影响可维护性;接下来我们借助AspectCore + Polly 封装了一个包,然后针对需要熔断降级的函数,直接在函数上打标签即可;

安装包
Install-Package Hei.Hystrix
在program.cs里面启用
按不同需求配置启用即可
//只启用内存缓存
builder.Services.AddHeiHystrix();

//启用内存缓存和redis缓存
builder.Services.AddHeiHystrix(o =>
{
    o.RedisConnectionString = AppSettings.GetConnectionString("Redis");
});

//启用内存缓存和redis缓存,且要修改缓存数据的序列化配置
builder.Services.AddHeiHystrix(o =>
{
    o.RedisConnectionString = AppSettings.GetConnectionString("Redis");
    o.JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web)
    {
        MaxDepth = 64,
        PropertyNameCaseInsensitive = false,
        DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull
    };
});

使用
接口的话,标签直接打到接口成员上;否则,标签直接打到函数上;
//先写个回退方法
Task<string> MyFallback();

/// <summary>
/// 增加回退逻辑
/// </summary>
/// <returns></returns>
[HeiHystrix(nameof(MyFallback))]
Task OnlyFallback();

/// <summary>
/// 熔断处理
/// </summary>
/// <returns></returns>
[HeiHystrix(nameof(MyFallback), EnableCircuitBreaker = true, ExceptionsAllowedBeforeBreaking = 3, MillisecondsOfBreak = 10 * 1000)] //ExceptionsAllowedBeforeBreaking=熔断前执行3次,每次熔断10秒
Task CircuitBreaker();

/// <summary>
/// 超时处理
/// </summary>
/// <returns></returns>
//[HeiHystrix(nameof(MyFallback), TimeOutMilliseconds = 1*1000)]
[HeiHystrix(nameof(MyFallback), TimeOutMilliseconds = 2 * 1000)]
Task<string> TimeOut();

/// <summary>
/// 重试
/// </summary>
/// <returns></returns>
//[HeiHystrix(nameof(Retry), MaxRetryTimes = 1, RetryIntervalMilliseconds = 0)]
[HeiHystrix(nameof(MyFallback), MaxRetryTimes = 1, RetryIntervalMilliseconds = 4 * 1000)] //重试1次,重试间隔4秒
Task<string> Retry();

/// <summary>
/// 缓存
/// </summary>
/// <returns></returns>
// [HeiHystrix(nameof(MyFallback), CacheTTLSeconds = 5)]//内存缓存,有fallback逻辑,缓存5秒
// [HeiHystrix(CacheTTLMinutes = 2)] //内存缓存,缓存2分钟
[HeiHystrix(CacheType = CacheTypeEnum.Redis, CacheTTLMinutes = 2)]//redis缓存,2分钟
Task<List<string>> CacheDataAsync();
这是接口的实现
public async Task<string> MyFallback()
{
    var msg = "MyFallback Executed!!!!!!!!!!!!!!!!!!";
    Console.WriteLine(msg);
    return msg;
}

public async Task OnlyFallback()
{
    Console.WriteLine("执行熔断方法 OnlyFallback");
    throw new Exception("fallback异常");
}

public async Task CircuitBreaker()
{
    Console.WriteLine("执行熔断方法CircuitBreaker");
    throw new Exception("熔断异常");
}

public async Task<string> TimeOut()
{
    Console.WriteLine("执行timeOut方法");
    await Task.Delay(2 * 1000);
    return "执行timeOut方法";
}

public async Task<string> Retry()
{
    Console.WriteLine("执行方法Retry");
    throw new Exception("重试异常");
    return "执行方法Retry";
}

public void CacheVoid()
{
    Console.WriteLine("执行缓存CacheVoid" + DateTime.Now.ToString());
}

public async Task CacheTask()
{
    Console.WriteLine("执行缓存CacheVoid" + DateTime.Now.ToString());
}

public async Task<List<string>> CacheDataAsync()
{
    var datatime = DateTime.Now.ToString();
    Console.WriteLine("执行缓存CacheData" + datatime);
    return new List<string>
    {
        datatime,
        new Random().Next(1,10000).ToString()
    };
}

总结
最后的nuget包其实总体上是基于杨老师的代码简单改了下,加上了比较常用的redis缓存;然后redis缓存序列化这块也基本是“致敬”一念大佬的这个项目(github/NCache) ,大家可以点个星;

然后还有批量限制,舱壁隔离等,我目前需求不多 暂不加,后续看需要更新。
用户评论