• Kafka有没有一个办法能让系统自动解析成自己想要的参数对象?
  • 发布于 1周前
  • 56 热度
    0 评论
咱们在写Kafka消费者的时候,有没有发现一个很麻烦的事:消费消息前每次都要手动解析Kafka消息,转换成自己想要的类型,再进行业务操作,比如:
/**
 * 订单支付成功通知
 */
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(ConsumerRecord<String, String> consumerRecord) {
  // 解析Kafka消息
OrderPaySuccessEvent event = JSON.parseObject(consumerRecord.value(), OrderPaySuccessEvent.class);
  // TODO 完成解析成功后的业务操作
}
对代码有洁癖的同志就比较难受了,每次解析的操作都差不多,但这又不是业务代码……于是你就想:有没有一个办法让系统能够自动解析成自己想要的参数对象呢?其实是有的,早在一年前,我写过一个组件,对kafka消费做了一层浅封装,也一直在工程中沿用到现在:Kafka消费者这样封装,一年节省10,000行代码!

不过也发现了一些难以解决的问题,比如:共用了一个groupId,使用Java线程池来管理,这样工程会存在性能瓶颈。专业的事还是交给专业的“人”来做吧,用Kafka组件本身来管理不同分组消费会更靠谱。Spring官方也发现这个问题,并且对此提出了解决方案——spring-messaging包下的HandlerMethodArgumentResolver接口。

我们从官方文档中看到,Spring-kafka在2.4.2版本支持了对kafka消息进行方法参数级别转换。其实,spring-web包下也有这个同名接口,用于自动解析Controller方法上的参数,这块网上资料比较多我就不详细展开了,而spring-messaging包下面这个接口的非官方资料比较少,我这里给大家总结一下用法。

HandlerMethodArgumentResolver方法参数处理器接口很简单,只有两个方法:
public interface HandlerMethodArgumentResolver {
    boolean supportsParameter(MethodParameter var1);

    @Nullable
    Object resolveArgument(MethodParameter var1, Message<?> var2) throws Exception;
}
supportsParameter方法返回是否支持参数自动解析,resolveArgument方法就是具体的解析逻辑,把MQ传递参数转换为具体类型参数。我们来看一下实际案例。
首先实现HandlerMethodArgumentResolver接口,定义为一个Spring的Component:
@Component
public class KafkaListenerMethodArgumentResolver implements HandlerMethodArgumentResolver {

    @Override
    public boolean supportsParameter(@NonNull MethodParameter parameter) {
      // 默认以com.xxx开头的类,这样可以不用在在参数前加@Payload注解
        return parameter.getParameterType().getName().startsWith("com.xxx") || parameter.hasParameterAnnotation(Payload.class);
    }

    @Override
    public Object resolveArgument(@NonNull MethodParameter parameter, @NonNull Message<?> message) {
        Class<?> parameterType = parameter.getParameterType();
        String messageContent = (String) message.getPayload();

        Object body;
        try {
          // 这里定义自己的解析方法
            body = JsonUtils.fromJson(messageContent, parameterType);
            Objects.requireNonNull(body);
        } catch (Throwable cause) {
            throw new KafkaException("kafka 消息解析失败: 非法JSON字符串", cause);
        }
// 可选,定义解析后的参数校验逻辑
        validate(parameter, body);
        return body;
    }

    private void validate(MethodParameter parameter, Object target) {
        for (Annotation ann : parameter.getParameterAnnotations()) {
            Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
            if (Objects.nonNull(validatedAnn) || ann.annotationType().getSimpleName().startsWith("Valid")) {
                ValidationUtils.valid(target);
            }
        }
    }
}
到这里,一个类就把参数自动解析搞定了,接下来咱们看看怎么用。
/**
 * 订单支付成功通知
 * 堆代码 duidaima.com
 */
@KafkaListener(topics = "oms.orderPaySuccess", groupId = "fms")
public void orderPaySuccess(@Payload OrderPaySuccessEvent orderPaySuccessEvent) {
  // 已经自动解析Kafka消息为orderPaySuccessEvent参数

  // TODO 完成解析成功后的业务操作
}
怎么样,代码是不是比原来简洁多了,香不香!如果你对技术有追求,不想一直写业务代码,不妨把项目中所有需要手动解析参数的地方,替换成自定义方法参数解析器来实现~
用户评论