rocketmq提供了客户端扩展点messageHook
但是使用rocketmq-spring-boot-starter时
提供的RocketMQTemplate与RocketMQMessageListener却没有设置hook的位置
此时只能自己解决了。
消费端的 ConsumeMessageHook
关键点:listener的初始化位置
可以查看源码
RocketMQAutoConfiguration.java初始化类
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {
这里可以看到ListenerContainerConfiguration listenerContainer的初始化类ListenerContainerConfiguration.java 类 实现了 SmartInitializingSingleton 接口
而创建listener是在afterSingletonsInstantiated 方法中创建的。
这里就涉及到生命周期的问题了。我们必须在该阶段过后在去设置hook
生命周期验证的小demo可以看下面的链接
http://www.apple-dina.com/view/138
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
public void afterSingletonsInstantiated() {
Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {
return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
beans.forEach(this::registerContainer);
}
private void registerContainer(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
} else if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
} else {
RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class);
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled = (Boolean)((Map)this.rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)).getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug("Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic);
} else {
this.validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), this.counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);
}, new BeanDefinitionCustomizer[0]);
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception var12) {
log.error("Started container failed. {}", container, var12);
throw new RuntimeException(var12);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
}
}自己写的测试类设置ConsumeMessageHook的方法
我们实现
ApplicationContextAware接口用于获取上下文
SmartLifecycle 用于在初始化完成后的生命周期去设置hook
保证设置hook是在afterSingletonsInstantiated之后。
不然listenerContainer还没有初始化。
@Component
public class RocketMqListenerConfiguration implements ApplicationContextAware, SmartLifecycle {
private boolean isRunning = false;
private ApplicationContext applicationContext;
@Resource
RocketConsumeMessageHook rocketConsumeMessageHook;
@Resource
ListenerContainerConfiguration listenerContainerConfiguration;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void start() {//关键扩展点
registerConsumerMessageHook();
isRunning = true;
}
@Override
public void stop() {
}
@Override
public boolean isRunning() {
return isRunning;
}
private void registerConsumerMessageHook() {
System.out.println("-------------------------------------添加hook完成---------------------------------");
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) this.applicationContext;
Map<String, DefaultRocketMQListenerContainer> containerMap =
genericApplicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
containerMap.forEach(
(name, container) -> {
container.getConsumer().getDefaultMQPushConsumerImpl()
.registerConsumeMessageHook(rocketConsumeMessageHook);
System.out.println("consumer-container:" + name + " 添加hook ");
});
}
}自定义的RocketConsumeMessageHook.java 这里就是简单打了下日志
当然也可以干点别的
@Component
public class RocketConsumeMessageHook implements ConsumeMessageHook {
private final Logger logger = LoggerFactory.getLogger("rocketmq");
@Override
public String hookName() {
return "RocketConsumeMessageHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
for (MessageExt messageExt : context.getMsgList()) {
logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup());
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
for (MessageExt messageExt : context.getMsgList()) {
logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(),
context.getConsumerGroup(), context.isSuccess(), context.getStatus());
}
}
}发送端的SendMessageHook
这个就比较简单了,用上面的方式也行,用其它的也行,因为RocketMQTemplate实在实例化时就创建了producer
RocketMQConfiguration.java 这个就是按上面写的。
也可以用@PostConstruct初始化方法
我是直接new了个hook 当然也可以使用DI或者搞一堆塞进去,看你怎么用吧
@Component
public class RocketMQConfiguration implements SmartLifecycle {
private boolean isRunning = false;
@Resource
RocketMQTemplate rocketMQTemplate;
public void registerSendMessageHook() {
System.out.println("-------------------------------------添加hook完成---------------------------------");
rocketMQTemplate.getProducer().getDefaultMQProducerImpl()
.registerSendMessageHook(new RocketMqSendMessageHook());
}
@Override
public void start() {
registerSendMessageHook();
isRunning = true;
}
@Override
public void stop() {
}
@Override
public boolean isRunning() {
return isRunning;
}
}RocketMqSendMessageHook.java 这里也是简单的打印了个日志
当然扩展别的就看自己了
public class RocketMqSendMessageHook implements SendMessageHook {
Logger logger = LoggerFactory.getLogger("rocketmq");
@Override
public String hookName() {
return "rocket_sendMessage_Hook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
logger.info("准备发送消息,topic:{}", context.getMessage().getTopic());
}
@Override
public void sendMessageAfter(SendMessageContext context) {
SendResult sendResult = context.getSendResult();
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("消息发送成功,topic:{}", context.getMessage().getTopic());
} else {
logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus());
}
}
}最后有个疑惑
rocketMQTemplate.getProducer().getDefaultMQProducerImpl() container.getConsumer().getDefaultMQPushConsumerImpl() //上面两个方法都标记为了过期方法,但是有没有提供对应的方法,不知道未来会不会有新的api来操作
乐享:知识积累,快乐无限。