前言
先说一下为什么要有这个东西,用消息中间件的好处就不用说了,日常开发中还是有很多场景需要用到消息传递的,消息的topic如何管理,如何约束topic,重要的topic消费记录、历史消息等就是这个sdk需要做的。本质上只是一层对消息中间件的封装。这次只是抛砖引玉只引入redis的三种消息类型,pubsub、queue以及stream。
扩展其他中间件按着代码思路一样。望各路大佬赐教
架构设计
一个消息服务sdk首先需要具备两个能力,即生产和消费,这两个功能离不开校验topic合法性,我们姑且简单点在mysql数据库中,但不可能每次校验topic是否合法都要去查询数据库,这里借鉴kafka存放topic信息的思想,找一个redis的key存放所有的topic列表。
定义一个核心service接口
public interface MessageHubService {
/**
* 生产消息
*/
void producer(MessageForm messageForm);
/**
* 消费消息
*/
void consumer(ConsumerAdapterForm adapterForm);
/**
* 检查topic、type合法性
*/
void checkTopic(String topic, String type);
}
方法入参统一使用MessageForm类,里面定义一些基础的信息,比如哪个消息topic,哪个消息类型等等。
@Data
public class MessageForm {
// 消息组件类型
private String type;
// 消息主题
private String topic;
private String message = "";
// 消费者组
private String group = "UPTOWN";
}
具体实现
基础类实现
@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {
@Resource
protected StringRedisTemplate stringRedisTemplate;
public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();
private ApplicationContext applicationContext;
@PostConstruct
public void init() {
messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
}
public void checkTopic(String topic, String type) {
if (!messageHubServiceMap.containsKey(type)) {
throw new MatrixException("消息类型不支持");
}
List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
throw new MatrixException("当前topic未配置");
}
}
@Override
public void producer(MessageForm messageForm) {
this.checkTopic(messageForm.getTopic(), messageForm.getType());
this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
}
/**
* 堆代码 duidaima.om
* 消费者创建通过注解,已校验topic合法性
*/
@Override
public void consumer(ConsumerAdapterForm messageForm) {
this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
具体自实现类
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
@Override
public void producer(MessageForm messageForm) {
// 具体生产逻辑
}
@Override
public void consumer(ConsumerAdapterForm messageForm) {
// 具体消费逻辑
}
}
生产者逻辑
生产者API做的比较简单,只是提供一个API调用,在调用前做一些校验工作,仅仅的是一条命令,不做发送失败的重试等操作。
消费者逻辑
消费者的话还是定义一个注解,通过借助SpringBoot生命周期扫描注解的方式在后台建立常驻线程的方式。
@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {
@Resource(name = "messageHubServiceImpl")
MessageHubService messageHubService;
@Bean(name = "redisPubSubConsumerMap")
public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
return new ConcurrentHashMap<>();
}
@Override
public void destroy() throws Exception {
}
@Override
public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
for (Method method : methods) {
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
continue;
}
String resolveTopic = annotation.topic();
try {
messageHubService.checkTopic(resolveTopic, annotation.type());
} catch (Exception e) {
throw new Error(e.getMessage());
}
ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
adapterForm.setBean(bean);
adapterForm.setInvokeMethod(method);
adapterForm.setTopic(resolveTopic);
adapterForm.setType(annotation.type());
adapterForm.setGroup(annotation.group());
messageHubService.consumer(adapterForm);
}
return bean;
}
}
这里依靠spring生命周期,拿到所有的bean,根据注解标注的方法去走不同的逻辑生成常驻线程,监听到消息之后回调到标注了注解的方法里。
Topic守护线程
@Slf4j
@Service
public class TopicReloadTask extends TimerTask {
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
EntityManager entityManager;
public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
public final String LUA_SCRIPT =
"redis.call('del', 'MESSAGEHUB_TOPIC')" +
"local topics = KEYS " +
"for i, v in pairs(topics) do " +
" redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
"end";
@Override
public void run() {
try {
List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(topics)) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
Long result = stringRedisTemplate.execute(redisScript, topics);
log.info("reload topic finish");
}
} catch (Throwable t) {
log.error("messagehub topic reload error", t);
}
}
private <T> List<T> getQueryResult(String sql, Class<T> clazz) {
Query dataQuery = entityManager.createNativeQuery(sql, clazz);
List<T> result = new ArrayList<>();
List<Object> list = dataQuery.getResultList();
for (Object o : list) {
result.add((T) o);
}
return result;
}
}
定义一个timer任务,隔一段时间将mysql中的topic白名单通过lua脚本的方式刷新到指定的reids topic key中。还有一些可以优化的地方,比如同步topic的操作只需要一个服务即可,所以可以使用@ConditionalOnProperty注解判断是否需要进行同步topic。