Dapr 被设计成一个面向开发者的企业级微服务编程平台,它独立于具体的技术平台,可以运行在“任何地方”。Dapr本身并不提供“基础设施(infrastructure)”,而是利用自身的扩展来适配具体的部署环境。就目前的状态来说,如果希望真正将原生的Dapr应用于生产,只能部署在K8S环境下。虽然Dapr也提供针对Hashicorp Consul的支持,但是目前貌似没有稳定的版本支持。
Kubernetes对于很多公司并非“标配”,由于某些原因,它们可以具有一套自研的微服务平台或者弹性云平台,让Dapr与之适配可能更有价值。这两周我们对此作了一些可行性研究,发现这其实不难,记下来我们就通过一个非常简单的实例来介绍一下大致的解决方案。
从部署的角度来看,Dapr的所有功能都体现在与应用配对的Sidecar上。我们进行服务调用的时候只需要指定服务所在的目标应用的ID(AppID)就可以了。服务请求(HTTP或者gRPC)从应用转到sidecar,后者会将请求“路由”到合适的节点上。如果部署在Kubernetes集群上,如果指定了目标服务的标识和其他相关的元数据(命名空间和集群域名等),服务请求的寻址就不再是一个问题。
实际上NameResolution组件体现的针对“名字(Name)”的“解析(Resolution)”解决的就是如将Dapr针对应用的标识AppID转换成基于部署环境的应用标识的问题。从dapr提供的代码来看,它目前注册了如下3种类型的NameResolution组件:
mdns:利用mDNS(Multicast DNS)实现服务注册与发现,如果没有显式配置,默认使用的就是此类型。由于mDNS仅仅是在小规模网络中采用广播通信实现的一种DNS,所以根本不适合正式的生成环境。一个注册的NameResolution组件旨在提供一个Resolver对象,该对象通过如下的接口来表示。如下面的代码片段所示,Resolver接口提供两个方法,Init方法会在应用启动的时候调用,作为参数的Metadata会携带于当前应用实例相关的元数据(包括应用标识和端口,以及Sidecar的HTTP和gRPC端口等)和针对当前NameResolution组件的配置。
对于每一次服务调用,目标应用标识和命名空间等相关信息会被Sidecar封装成一个ResolveRequest 接口,并最为参数调用Resolver对象的ReolveID方法,最终得到一个于当前部署环境相匹配的表示,并利用此标识借助基础设施的利用完整目标服务的调用。
package nameresolution // 堆代码 duidaima.com type Resolver interface { Init(metadata Metadata) error ResolveID(req ResolveRequest) (string, error) } type Metadata struct { Properties map[string]string `json:"properties"` Configuration interface{} } type ResolveRequest struct { ID string Namespace string Port int Data map[string]string }三、模拟服务注册与负载均衡
public class HomeController: Controller { private static readonly ConcurrentDictionary<string, EndpointCollection> _applications = new(); [HttpPost("/register")] public IActionResult Register( [FromBody] RegisterRequest request) { var appId = request.Id; var endpoints = _applications.TryGetValue( appId, out var value) ? value : _applications[appId] = new(); endpoints.TryAdd(request.HostAddress, request.Port); Console.WriteLine($"Register {request.Id} =>{request.HostAddress}:{request.Port}"); return Ok(); } [HttpPost("/resolve")] public IActionResult Resolve( [FromBody] ResolveRequest request) { if (_applications.TryGetValue( request.ID, out var endpoints) && endpoints.TryGet(out var endpoint)) { Console.WriteLine( $"Resolve app {request.ID} =>{endpoint}"); return Content(endpoint!); } return NotFound(); } } public class EndpointCollection { private readonly List<string> _endpoints = new(); private int _index = 0; private readonly object _lock = new(); public bool TryAdd(string ipAddress, int port) { lock (_lock) { var endpoint = $"{ipAddress}:{port}"; if (_endpoints.Contains(endpoint)) { return false; } _endpoints.Add(endpoint); return true; } } public bool TryGet(out string? endpoint) { lock (_lock) { if (_endpoints.Count == 0) { endpoint = null; return false; } _index++; if (_index >= _endpoints.Count) { _index = 0; } endpoint = _endpoints[_index]; return true; } } }HomeController提供了两个Action方法,Register方法用来注册应用,自定义Resolver的Init方法会调用它。另一个方法Resolve则用来完成根据请求的应用表示得到一个具体的终结点,自定义Resolver的ResolveID方法会调用它。这两个方法的参数类型RegisterRequest和ResolveRequest定义如下,后者和前面给出的同名接口具有一致的定义。两个Action都会在控制台输出相应的文字显示注册的应用信息和解析出来的终结点。
public class RegisterRequest { public string Id { get; set; } = default!; public string HostAddress { get; set; } = default!; public int Port { get; set; } } public class ResolveRequest { public string ID { get; set; } = default!; public string? Namespace { get; set; } public int Port { get; } public Dictionary<string, string> Data { get; } = new(); }四、自定义NameResolution组件
package svcreg import ( "bytes" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "strconv" "github.com/dapr/components-contrib/nameresolution" "github.com/dapr/kit/logger" ) type Resolver struct { logger logger.Logger registerEndpoint string resolveEndpoint string } type RegisterRequest struct { Id, HostAddress string Port int64 } func (resolver *Resolver) Init( metadata nameresolution.Metadata) error { var endpoint, appId, hostAddress string var ok bool // Extracts register & resolve endpoint if dic, ok := metadata.Configuration .(map[interface{}]interface{}); ok { endpoint = fmt.Sprintf( "%s", dic["endpointAddress"]) resolver.registerEndpoint = fmt.Sprintf("%s/register", endpoint) resolver.resolveEndpoint = fmt.Sprintf("%s/resolve", endpoint) } if endpoint == "" { return errors.New( "service registry endpoint is not configured") } // Extracts AppID, HostAddress and Port props := metadata.Properties if appId, ok = props[nameresolution.AppID]; !ok { return errors.New( "AppId does not exist in the name resolution metadata") } if hostAddress, ok = props[nameresolution.HostAddress]; !ok { return errors.New( "HostAddress does not exist in the name resolution metadata") } p, ok := props[nameresolution.DaprPort] if !ok { return errors.New( "DaprPort does not exist in the name resolution metadata") } port, err := strconv.ParseInt(p, 10, 32) if err != nil { return errors.New("DaprPort is invalid") } // Register service (application) var request = RegisterRequest{appId, hostAddress, port} payload, err := json.Marshal(request) if err != nil { return errors.New("fail to marshal register request") } _, err = http.Post(resolver.registerEndpoint, "application/json", bytes.NewBuffer(payload)) if err == nil { resolver.logger.Infof( "App '%s (%s:%d)' is successfully registered.", request.Id, request.HostAddress, request.Port) } return err } func (resolver *Resolver) ResolveID( req nameresolution.ResolveRequest) (string, error) { payload, err := json.Marshal(req) if err != nil { return "", err } response, err := http.Post( resolver.resolveEndpoint, "application/json", bytes.NewBuffer(payload)) if err != nil { return "", err } defer response.Body.Close() result, err := ioutil.ReadAll(response.Body) if err != nil { return "", err } return string(result), nil } func NewResolver(logger logger.Logger) *Resolver { return &Resolver{ logger: logger, } }如上面的代码片段所示,我们定义核心的Resolver结构,该接口除了具有一个用来记录日志的logger字段,还有两个额外的字段registerEndpoint和resolveEndpoint,分别代表ServiceRegistry提供的两个API的URL。在为Resolver结构实现的Init方法中,我们从作为参数的元数据中提取出配置,并进一步从配置中提取出ServiceRegistry的地址,并在此基础上添加路由路径“/register”和“/resolve”对Resolver结构的registerEndpoint和resolveEndpoint字段进行初始化。接下来我们从元数据中提取出AppID、IP地址和内部gRPC端口号(外部应用通过此端口调用当前应用的Sidecar),它们被封装成RegisterRequest结构之后被序列化成JSON字符串,并作为输入调用对应的Web API完成对应的服务注册。
// Name resolutions. nr "github.com/dapr/components-contrib/nameresolution" nr_consul "github.com/dapr/components-contrib/nameresolution/consul" nr_kubernetes "github.com/dapr/components-contrib/nameresolution/kubernetes" nr_mdns "github.com/dapr/components-contrib/nameresolution/mdns" nr_svcreg "github.com/dapr/components-contrib/nameresolution/svcreg"在main函数中,我们找到用来注册NameResolution组件的那部分代码,按照其他NameResolution组件注册那样,依葫芦画瓢完成针对svcreg的注册即可。注册代码中用来提供Resolver的NewResolver函数定义在上述的svcreg.go文件中。
runtime.WithNameResolutions( nr_loader.New("svcreg", func() nr.Resolver { return nr_svcreg.NewResolver(logContrib) }), nr_loader.New("mdns", func() nr.Resolver { return nr_mdns.NewResolver(logContrib) }), nr_loader.New("kubernetes", func() nr.Resolver { return nr_kubernetes.NewResolver(logContrib) }), nr_loader.New("consul", func() nr.Resolver { return nr_consul.NewResolver(logContrib) }), ),六、编译部署daprd.exe
replace ( go.opentelemetry.io/otel => go.opentelemetry.io/otel v0.20.0 gopkg.in/couchbaselabs/gocbconnstr.v1 => github.com/couchbaselabs/gocbconnstr v1.0.5 k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36 github.com/dapr/components-contrib => ../components-contrib )在将当前目录切换到“dapr/cmd/daprd/”后,以命令行的方式执行“go build”后会在当前目录下生成一个daprd.exe可执行文件。现在我们需要使用这个新的daprd.exe将当前使用使用的替换掉,该文件所在的目录在“%userprofile%.dapr\bin”。
apiVersion: dapr.io/v1alpha1 kind: Configuration metadata: name: daprConfig spec: nameResolution: component: "svcreg" configuration: endpointAddress: http://127.0.0.1:3721 tracing: samplingRate: "1" zipkin: endpointAddress: http://localhost:9411/api/v2/spans八、测试效果
using Microsoft.AspNetCore.Mvc; using Shared; var app = WebApplication.Create(args); app.MapPost("{method}", Calculate); app.Run("http://localhost:9999"); static IResult Calculate( string method, [FromBody] Input input) { var result = method.ToLower() switch { "add" => input.X + input.Y, "sub" => input.X - input.Y, "mul" => input.X * input.Y, "div" => input.X / input.Y, _ => throw new InvalidOperationException($"Invalid method {method}") }; return Results.Json(new Output { Result = result }); } public class Input { public int X { get; set; } public int Y { get; set; } } public class Output { public int Result { get; set; } public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.Now; }具有如下定义的App1是一个控制台程序,它利用Dapr客户端SDK调用了上诉四个API。
using Dapr.Client; using Shared; HttpClient client = DaprClient .CreateInvokeHttpClient(appId: "app2"); var input = new Input(2, 1); await InvokeAsync("add", "+"); await InvokeAsync("sub", "-"); await InvokeAsync("mul", "*"); await InvokeAsync("div", "/"); async Task InvokeAsync(string method, string @operator) { var response = await client .PostAsync(method, JsonContent.Create(input)); var output = await response .Content.ReadFromJsonAsync<Output>(); Console.WriteLine( $"{input.X} {@operator} {input.Y} = {output.Result} ({output.Timestamp})"); }在启动ServiceRegistry之后,我们启动App2,控制台上会阐述如下的输出。从输出的NameResolution组件名称可以看出,我们自定义的svcreg正在被使用。