• 如何基于Redis实现内部消息服务通信
  • 发布于 2个月前
  • 201 热度
    0 评论
前言
先说一下为什么要有这个东西,用消息中间件的好处就不用说了,日常开发中还是有很多场景需要用到消息传递的,消息的topic如何管理,如何约束topic,重要的topic消费记录、历史消息等就是这个sdk需要做的。本质上只是一层对消息中间件的封装。这次只是抛砖引玉只引入redis的三种消息类型,pubsub、queue以及stream。

扩展其他中间件按着代码思路一样。望各路大佬赐教

架构设计
一个消息服务sdk首先需要具备两个能力,即生产和消费,这两个功能离不开校验topic合法性,我们姑且简单点在mysql数据库中,但不可能每次校验topic是否合法都要去查询数据库,这里借鉴kafka存放topic信息的思想,找一个redis的key存放所有的topic列表。

定义一个核心service接口
public interface MessageHubService {

    /**
     * 生产消息
     */
    void producer(MessageForm messageForm);

    /**
     * 消费消息
     */
    void consumer(ConsumerAdapterForm adapterForm);

    /**
     * 检查topic、type合法性
     */
    void checkTopic(String topic, String type);
}
方法入参统一使用MessageForm类,里面定义一些基础的信息,比如哪个消息topic,哪个消息类型等等。
@Data
public class MessageForm {
    // 消息组件类型
    private String type;
    // 消息主题
    private String topic;
    private String message = "";
    // 消费者组
    private String group = "UPTOWN";
}
具体实现
基础类实现
@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {

    @Resource
    protected StringRedisTemplate stringRedisTemplate;

    public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();

    private ApplicationContext applicationContext;


    @PostConstruct
    public void init() {

        messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
    }

    public void checkTopic(String topic, String type) {
        if (!messageHubServiceMap.containsKey(type)) {

            throw new MatrixException("消息类型不支持");
        }

        List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
        if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
            throw new MatrixException("当前topic未配置");
        }
    }

    @Override
    public void producer(MessageForm messageForm) {
        this.checkTopic(messageForm.getTopic(), messageForm.getType());
        this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
    }


    /**
    * 堆代码 duidaima.om
     * 消费者创建通过注解,已校验topic合法性
     */
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
具体自实现类
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {

    @Override
    public void producer(MessageForm messageForm) {
        // 具体生产逻辑
    }

    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        // 具体消费逻辑
    }
}
生产者逻辑
生产者API做的比较简单,只是提供一个API调用,在调用前做一些校验工作,仅仅的是一条命令,不做发送失败的重试等操作。

消费者逻辑
消费者的话还是定义一个注解,通过借助SpringBoot生命周期扫描注解的方式在后台建立常驻线程的方式。
@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {


    @Resource(name = "messageHubServiceImpl")
    MessageHubService messageHubService;

    @Bean(name = "redisPubSubConsumerMap")
    public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
        return new ConcurrentHashMap<>();
    }

    @Override
    public void destroy() throws Exception {

    }

    @Override
    public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
            if (annotation == null) {
                continue;
            }
            String resolveTopic = annotation.topic();
            try {
                messageHubService.checkTopic(resolveTopic, annotation.type());
            } catch (Exception e) {

                throw new Error(e.getMessage());
            }

            ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
            adapterForm.setBean(bean);
            adapterForm.setInvokeMethod(method);
            adapterForm.setTopic(resolveTopic);
            adapterForm.setType(annotation.type());
            adapterForm.setGroup(annotation.group());
            messageHubService.consumer(adapterForm);
        }
        return bean;
    }
}
这里依靠spring生命周期,拿到所有的bean,根据注解标注的方法去走不同的逻辑生成常驻线程,监听到消息之后回调到标注了注解的方法里。

Topic守护线程
@Slf4j
@Service
public class TopicReloadTask extends TimerTask {

    @Resource
    StringRedisTemplate stringRedisTemplate;

    @Resource
    EntityManager entityManager;


    public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
    public final String LUA_SCRIPT =
                "redis.call('del', 'MESSAGEHUB_TOPIC')" +
                "local topics = KEYS " +
                "for i, v in pairs(topics) do " +
                "  redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
                "end";


    @Override
    public void run() {
        try {
            List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
            if (!ObjectUtils.isEmpty(topics)) {

                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
                Long result = stringRedisTemplate.execute(redisScript, topics);
                log.info("reload topic finish");
            }
        } catch (Throwable t) {

            log.error("messagehub topic reload error", t);
        }
    }

    private <T> List<T> getQueryResult(String sql, Class<T> clazz) {

        Query dataQuery = entityManager.createNativeQuery(sql, clazz);
        List<T> result = new ArrayList<>();
        List<Object> list = dataQuery.getResultList();
        for (Object o : list) {
            result.add((T) o);
        }
        return result;
    }
}
定义一个timer任务,隔一段时间将mysql中的topic白名单通过lua脚本的方式刷新到指定的reids topic key中。还有一些可以优化的地方,比如同步topic的操作只需要一个服务即可,所以可以使用@ConditionalOnProperty注解判断是否需要进行同步topic。
用户评论