rocketmq提供了客户端扩展点messageHook

但是使用rocketmq-spring-boot-starter时

提供的RocketMQTemplate与RocketMQMessageListener却没有设置hook的位置

此时只能自己解决了。


  • 消费端的 ConsumeMessageHook

    关键点:listener的初始化位置

可以查看源码


RocketMQAutoConfiguration.java初始化类

@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {

这里可以看到ListenerContainerConfiguration listenerContainer的初始化类


ListenerContainerConfiguration.java 类 实现了 SmartInitializingSingleton 接口

而创建listener是在afterSingletonsInstantiated 方法中创建的。

这里就涉及到生命周期的问题了。我们必须在该阶段过后在去设置hook

生命周期验证的小demo可以看下面的链接

http://www.apple-dina.com/view/138 


@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

public void afterSingletonsInstantiated() {
    Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {
        return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
    }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
    beans.forEach(this::registerContainer);
}
private void registerContainer(String beanName, Object bean) {
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
    if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
    } else if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
    } else {
        RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class);
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled = (Boolean)((Map)this.rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)).getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug("Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic);
        } else {
            this.validate(annotation);
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), this.counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext;
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
                return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);
            }, new BeanDefinitionCustomizer[0]);
            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
            if (!container.isRunning()) {
                try {
                    container.start();
                } catch (Exception var12) {
                    log.error("Started container failed. {}", container, var12);
                    throw new RuntimeException(var12);
                }
            }
            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    }
}


自己写的测试类设置ConsumeMessageHook的方法

我们实现

ApplicationContextAware接口用于获取上下文

SmartLifecycle 用于在初始化完成后的生命周期去设置hook

保证设置hook是在afterSingletonsInstantiated之后。

不然listenerContainer还没有初始化。



@Component
public class RocketMqListenerConfiguration implements ApplicationContextAware, SmartLifecycle {
    private boolean isRunning = false;
    private ApplicationContext applicationContext;
    @Resource
    RocketConsumeMessageHook rocketConsumeMessageHook;
    @Resource
    ListenerContainerConfiguration listenerContainerConfiguration;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    @Override
        public void start() {//关键扩展点
            registerConsumerMessageHook();
            isRunning = true;
        }
        @Override
        public void stop() {
        }
        @Override
        public boolean isRunning() {
            return isRunning;
        }
        
    private void registerConsumerMessageHook() {
        System.out.println("-------------------------------------添加hook完成---------------------------------");
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) this.applicationContext;
        Map<String, DefaultRocketMQListenerContainer> containerMap =
                genericApplicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
        containerMap.forEach(
                (name, container) -> {
                    container.getConsumer().getDefaultMQPushConsumerImpl()
                            .registerConsumeMessageHook(rocketConsumeMessageHook);
                    System.out.println("consumer-container:" + name + " 添加hook ");
                });
    }

}



自定义的RocketConsumeMessageHook.java 这里就是简单打了下日志

当然也可以干点别的


@Component
public class RocketConsumeMessageHook implements ConsumeMessageHook {
    private final Logger logger = LoggerFactory.getLogger("rocketmq");
    @Override
    public String hookName() {
        return "RocketConsumeMessageHook";
    }
    @Override
    public void consumeMessageBefore(ConsumeMessageContext context) {
        for (MessageExt messageExt : context.getMsgList()) {
            logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup());
        }
    }
    @Override
    public void consumeMessageAfter(ConsumeMessageContext context) {
        for (MessageExt messageExt : context.getMsgList()) {
            logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(),
                    context.getConsumerGroup(), context.isSuccess(), context.getStatus());
        }
    }
}



  • 发送端的SendMessageHook 

这个就比较简单了,用上面的方式也行,用其它的也行,因为RocketMQTemplate实在实例化时就创建了producer


RocketMQConfiguration.java 这个就是按上面写的。

也可以用@PostConstruct初始化方法

我是直接new了个hook 当然也可以使用DI或者搞一堆塞进去,看你怎么用吧


@Component
public class RocketMQConfiguration implements SmartLifecycle {
    private boolean isRunning = false;
    @Resource
    RocketMQTemplate rocketMQTemplate;
    public void registerSendMessageHook() {
        System.out.println("-------------------------------------添加hook完成---------------------------------");
        rocketMQTemplate.getProducer().getDefaultMQProducerImpl()
                .registerSendMessageHook(new RocketMqSendMessageHook());
    }
    @Override
    public void start() {
        registerSendMessageHook();
        isRunning = true;
    }
    @Override
    public void stop() {
    }
    @Override
    public boolean isRunning() {
        return isRunning;
    }
}




RocketMqSendMessageHook.java 这里也是简单的打印了个日志

当然扩展别的就看自己了


public class RocketMqSendMessageHook implements SendMessageHook {
    Logger logger = LoggerFactory.getLogger("rocketmq");
    @Override
    public String hookName() {
        return "rocket_sendMessage_Hook";
    }
    @Override
    public void sendMessageBefore(SendMessageContext context) {
        logger.info("准备发送消息,topic:{}", context.getMessage().getTopic());
    }
    @Override
    public void sendMessageAfter(SendMessageContext context) {
        SendResult sendResult = context.getSendResult();
        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("消息发送成功,topic:{}", context.getMessage().getTopic());
        } else {
            logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus());
        }
    }
}


最后有个疑惑

rocketMQTemplate.getProducer().getDefaultMQProducerImpl()
container.getConsumer().getDefaultMQPushConsumerImpl()
//上面两个方法都标记为了过期方法,但是有没有提供对应的方法,不知道未来会不会有新的api来操作




乐享:知识积累,快乐无限。