rocketmq提供了客户端扩展点messageHook
但是使用rocketmq-spring-boot-starter时
提供的RocketMQTemplate与RocketMQMessageListener却没有设置hook的位置
此时只能自己解决了。
消费端的 ConsumeMessageHook
关键点:listener的初始化位置
可以查看源码
RocketMQAutoConfiguration.java初始化类
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class}) @AutoConfigureAfter({MessageConverterConfiguration.class}) @AutoConfigureBefore({RocketMQTransactionConfiguration.class}) public class RocketMQAutoConfiguration { 这里可以看到ListenerContainerConfiguration listenerContainer的初始化类
ListenerContainerConfiguration.java 类 实现了 SmartInitializingSingleton 接口
而创建listener是在afterSingletonsInstantiated 方法中创建的。
这里就涉及到生命周期的问题了。我们必须在该阶段过后在去设置hook
生命周期验证的小demo可以看下面的链接
http://www.apple-dina.com/view/138
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { public void afterSingletonsInstantiated() { Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> { return !ScopedProxyUtils.isScopedTarget((String)entry.getKey()); }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); beans.forEach(this::registerContainer); } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } else if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } else { RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class); String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (Boolean)((Map)this.rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)).getOrDefault(topic, true); if (!listenerEnabled) { log.debug("Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); } else { this.validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), this.counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> { return this.createRocketMQListenerContainer(containerBeanName, bean, annotation); }, new BeanDefinitionCustomizer[0]); DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception var12) { log.error("Started container failed. {}", container, var12); throw new RuntimeException(var12); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } } }
自己写的测试类设置ConsumeMessageHook的方法
我们实现
ApplicationContextAware接口用于获取上下文
SmartLifecycle 用于在初始化完成后的生命周期去设置hook
保证设置hook是在afterSingletonsInstantiated之后。
不然listenerContainer还没有初始化。
@Component public class RocketMqListenerConfiguration implements ApplicationContextAware, SmartLifecycle { private boolean isRunning = false; private ApplicationContext applicationContext; @Resource RocketConsumeMessageHook rocketConsumeMessageHook; @Resource ListenerContainerConfiguration listenerContainerConfiguration; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void start() {//关键扩展点 registerConsumerMessageHook(); isRunning = true; } @Override public void stop() { } @Override public boolean isRunning() { return isRunning; } private void registerConsumerMessageHook() { System.out.println("-------------------------------------添加hook完成---------------------------------"); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) this.applicationContext; Map<String, DefaultRocketMQListenerContainer> containerMap = genericApplicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class); containerMap.forEach( (name, container) -> { container.getConsumer().getDefaultMQPushConsumerImpl() .registerConsumeMessageHook(rocketConsumeMessageHook); System.out.println("consumer-container:" + name + " 添加hook "); }); } }
自定义的RocketConsumeMessageHook.java 这里就是简单打了下日志
当然也可以干点别的
@Component public class RocketConsumeMessageHook implements ConsumeMessageHook { private final Logger logger = LoggerFactory.getLogger("rocketmq"); @Override public String hookName() { return "RocketConsumeMessageHook"; } @Override public void consumeMessageBefore(ConsumeMessageContext context) { for (MessageExt messageExt : context.getMsgList()) { logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup()); } } @Override public void consumeMessageAfter(ConsumeMessageContext context) { for (MessageExt messageExt : context.getMsgList()) { logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(), context.getConsumerGroup(), context.isSuccess(), context.getStatus()); } } }
发送端的SendMessageHook
这个就比较简单了,用上面的方式也行,用其它的也行,因为RocketMQTemplate实在实例化时就创建了producer
RocketMQConfiguration.java 这个就是按上面写的。
也可以用@PostConstruct初始化方法
我是直接new了个hook 当然也可以使用DI或者搞一堆塞进去,看你怎么用吧
@Component public class RocketMQConfiguration implements SmartLifecycle { private boolean isRunning = false; @Resource RocketMQTemplate rocketMQTemplate; public void registerSendMessageHook() { System.out.println("-------------------------------------添加hook完成---------------------------------"); rocketMQTemplate.getProducer().getDefaultMQProducerImpl() .registerSendMessageHook(new RocketMqSendMessageHook()); } @Override public void start() { registerSendMessageHook(); isRunning = true; } @Override public void stop() { } @Override public boolean isRunning() { return isRunning; } }
RocketMqSendMessageHook.java 这里也是简单的打印了个日志
当然扩展别的就看自己了
public class RocketMqSendMessageHook implements SendMessageHook { Logger logger = LoggerFactory.getLogger("rocketmq"); @Override public String hookName() { return "rocket_sendMessage_Hook"; } @Override public void sendMessageBefore(SendMessageContext context) { logger.info("准备发送消息,topic:{}", context.getMessage().getTopic()); } @Override public void sendMessageAfter(SendMessageContext context) { SendResult sendResult = context.getSendResult(); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { logger.info("消息发送成功,topic:{}", context.getMessage().getTopic()); } else { logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus()); } } }
最后有个疑惑
rocketMQTemplate.getProducer().getDefaultMQProducerImpl() container.getConsumer().getDefaultMQPushConsumerImpl() //上面两个方法都标记为了过期方法,但是有没有提供对应的方法,不知道未来会不会有新的api来操作
乐享:知识积累,快乐无限。