Spring cloud 集成ActiveMQ

JmsMessagingTemplate.convertSendAndReceive(String destinationName, Object request, Class<Msg> targetClass) ;的用法

同步消息在接收端的处理。

问题点在于怎么讲处理完成后的结果返回到结果队列中去。

结果队列到底是什么?

reveive处理完信息后返回destination 的获取。

 

通过源码的方法查看,最终于JmsTemplate 中找到关键信息

 

TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
if (logger.isDebugEnabled()) {
   logger.debug("Sending created message: " + requestMessage);
                            }
   doSend(producer, requestMessage);
   return receiveFromConsumer(consumer, getReceiveTimeout());
}


 

可以看出声明了一个TemporaryQueue作为返回队列

 

第一步我们看看返回队列怎么出来的

TemporaryQueue responseQueue = null;
responseQueue = session.createTemporaryQueue();


createTemporaryQueue() 方法跟下去

ActiveMQConnection connection;
 
(TemporaryQueue)connection.createTempDestination(false);


方法内部

dest = new ActiveMQTempQueue
(info.getConnectionId(), 
tempDestinationIdGenerator.getNextSequenceId());
ActiveMQTempQueue ActiveMQTempDestination

 

跟下去,你会在ActiveMQDestination 里面看到

public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";


ActiveMQTempDestination 里面

到这里大概就知道队列名字长啥样了,下面是一个真实的

temp-queue://ID:hostname-43682-1587991031774-3:1:1

 

第二步,我们要看看,返回队列的信息到底存放在哪里了。

然后这里,我们可以看到返回队列信息存放到了请求消息里面。

requestMessage.setJMSReplyTo(responseQueue);


进入方法,看看到底怎么实现的。

 

ActiveMQDestination 到该类。

this.setReplyTo(ActiveMQDestination.transform(destination));


我们来看看transform

 

if (dest instanceof xx) 会看到一堆类型判断

我们需要看下面这的处理

if (dest instanceof TemporaryQueue) {
  return new ActiveMQTempQueue(((TemporaryQueue) dest).getQueueName());
 }


TemporaryQueue .getQueueName() 方法

我去,只是放到了一个成员变量里面了。

protected ActiveMQDestination replyTo;

 

好吧,然后就悲剧了,通过template.receive方法,我们使用不带转换的,因为转换的就是你自己的信息了,拿不到方法的。

Message message = template.receive(destination);
Message只有两个方法,getpayload ,getheaders  那只能在header里面了。
MessageHeaders  header = message.getHeaders();
好吧,那我们试试
template.convertAndSend((Queue)header.getReplyChannel(),resMsg);
 
Exception in thread "Thread-7" java.lang.UnsupportedOperationException: 
A destination must be specified.


完蛋那明显不是了。

那就只能是存在headersmap里面那,那key值到底是什么呢。

 

那么我们此时就要看看header到底是怎么创建的了。

Message requestMessage = messageCreator.createMessage(session);


从这里我们看到messageCreator是参数传进来的。

我们返回去看JmsMessagingTemplate

javax.jms.Message jmsMessage = 
obtainJmsTemplate()
.sendAndReceive(destinationName, createMessageCreator(requestMessage));
return convertJmsMessage(jmsMessage);


 

这里我们看到message的创建createMessageCreator(requestMessage)

new MessagingMessageCreator(message, getJmsMessageConverter());


这里getJmsMessageConverter()

MessageConverter jmsMessageConverter = new MessagingMessageConverter();


MessagingMessageConverter这个类出现了

构造方法

public MessagingMessageConverter() {
 this(new SimpleMessageConverter(), new SimpleJmsHeaderMapper());
}


SimpleJmsHeaderMapper 是不是有点意思了。进去看看这个类。

发现这个方法

@Override
public void fromHeaders(MessageHeaders headers,
 javax.jms.Message jmsMessage)


方法里面

Destination jmsReplyTo = 
getHeaderIfAvailable(headers, JmsHeaders.REPLY_TO, Destination.class);


有这个东东  jmsReplyTo  是不是很眼熟。再看JmsHeaders.REPLY_TO header

进去看看

String PREFIX = "jms_";
String REPLY_TO = PREFIX + "replyTo";


JmsHeaders REPLY_TO  就是jms_ replyTo 它了 ,是不是很怀疑了,那试试。

template.convertAndSend((Queue)header.get("jms_replyTo"),resMsg);


ok ,果真是这个。

 

name此时我们知道  MessagingMessageCreator 里面包含了上面那些东西

MessagingMessageCreator. createMessage 方法

return this.messageConverter.toMessage(this.message, session);

方法内下面这个方法,果真调用了fromHeaders

this.headerMapper.fromHeaders(headers, reply);

方法内部

Set<Map.Entry<String, Object>> entries = headers.entrySet();
 for (Map.Entry<String, Object> entry : entries) {
    String headerName = entry.getKey();
      if (StringUtils.hasText(headerName) && 
      !headerName.startsWith(JmsHeaders.PREFIX)) {
         Object value = entry.getValue();
           if (value != null && 
           SUPPORTED_PROPERTY_TYPES.contains(value.getClass())) {
             try {
              String propertyName = this.fromHeaderName(headerName);
              jmsMessage.setObjectProperty(propertyName, value);
             }


一个循环将headers信息存入了message中。

嗯,不想跟了,这里其实还是很不清楚。但是我已经知道map里面存放的返回队列信息在headers 里面的key值是jms_replyTo



乐享:知识积累,快乐无限。