Spring cloud 集成ActiveMQ 中
JmsMessagingTemplate.convertSendAndReceive(String destinationName, Object request, Class<Msg> targetClass) ;的用法
同步消息在接收端的处理。
问题点在于怎么讲处理完成后的结果返回到结果队列中去。
结果队列到底是什么?
在reveive处理完信息后返回destination 的获取。
通过源码的方法查看,最终于JmsTemplate 中找到关键信息
TemporaryQueue responseQueue = null; MessageProducer producer = null; MessageConsumer consumer = null; try { Message requestMessage = messageCreator.createMessage(session); responseQueue = session.createTemporaryQueue(); producer = session.createProducer(destination); consumer = session.createConsumer(responseQueue); if (logger.isDebugEnabled()) { logger.debug("Sending created message: " + requestMessage); } doSend(producer, requestMessage); return receiveFromConsumer(consumer, getReceiveTimeout()); }
可以看出声明了一个TemporaryQueue作为返回队列
第一步我们看看返回队列怎么出来的
TemporaryQueue responseQueue = null; responseQueue = session.createTemporaryQueue();
createTemporaryQueue() 方法跟下去
ActiveMQConnection connection; (TemporaryQueue)connection.createTempDestination(false);
方法内部
dest = new ActiveMQTempQueue (info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); ActiveMQTempQueue ActiveMQTempDestination
跟下去,你会在ActiveMQDestination 里面看到
public static final String QUEUE_QUALIFIED_PREFIX = "queue://"; public static final String TOPIC_QUALIFIED_PREFIX = "topic://"; public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://"; public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
ActiveMQTempDestination 里面
到这里大概就知道队列名字长啥样了,下面是一个真实的
temp-queue://ID:hostname-43682-1587991031774-3:1:1
第二步,我们要看看,返回队列的信息到底存放在哪里了。
然后这里,我们可以看到返回队列信息存放到了请求消息里面。
requestMessage.setJMSReplyTo(responseQueue);
进入方法,看看到底怎么实现的。
ActiveMQDestination 到该类。
this.setReplyTo(ActiveMQDestination.transform(destination));
我们来看看transform
if (dest instanceof xx) 会看到一堆类型判断
我们需要看下面这的处理
if (dest instanceof TemporaryQueue) { return new ActiveMQTempQueue(((TemporaryQueue) dest).getQueueName()); }
TemporaryQueue .getQueueName() 方法
我去,只是放到了一个成员变量里面了。
protected ActiveMQDestination replyTo;
好吧,然后就悲剧了,通过template.receive方法,我们使用不带转换的,因为转换的就是你自己的信息了,拿不到方法的。
Message message = template.receive(destination); Message只有两个方法,getpayload ,getheaders 那只能在header里面了。 MessageHeaders header = message.getHeaders(); 好吧,那我们试试 template.convertAndSend((Queue)header.getReplyChannel(),resMsg); Exception in thread "Thread-7" java.lang.UnsupportedOperationException: A destination must be specified.
完蛋那明显不是了。
那就只能是存在headers的map里面那,那key值到底是什么呢。
那么我们此时就要看看header到底是怎么创建的了。
Message requestMessage = messageCreator.createMessage(session);
从这里我们看到messageCreator是参数传进来的。
我们返回去看JmsMessagingTemplate
javax.jms.Message jmsMessage = obtainJmsTemplate() .sendAndReceive(destinationName, createMessageCreator(requestMessage)); return convertJmsMessage(jmsMessage);
这里我们看到message的创建createMessageCreator(requestMessage)
new MessagingMessageCreator(message, getJmsMessageConverter());
这里getJmsMessageConverter()
MessageConverter jmsMessageConverter = new MessagingMessageConverter();
MessagingMessageConverter这个类出现了
构造方法
public MessagingMessageConverter() { this(new SimpleMessageConverter(), new SimpleJmsHeaderMapper()); }
SimpleJmsHeaderMapper 是不是有点意思了。进去看看这个类。
发现这个方法
@Override public void fromHeaders(MessageHeaders headers, javax.jms.Message jmsMessage)
方法里面
Destination jmsReplyTo = getHeaderIfAvailable(headers, JmsHeaders.REPLY_TO, Destination.class);
有这个东东 jmsReplyTo 是不是很眼熟。再看JmsHeaders.REPLY_TO header啊
进去看看
String PREFIX = "jms_"; String REPLY_TO = PREFIX + "replyTo";
那JmsHeaders 的REPLY_TO 就是jms_ replyTo 它了 ,是不是很怀疑了,那试试。
template.convertAndSend((Queue)header.get("jms_replyTo"),resMsg);
ok ,果真是这个。
name此时我们知道 MessagingMessageCreator 里面包含了上面那些东西
MessagingMessageCreator. createMessage 方法
return this.messageConverter.toMessage(this.message, session);
方法内下面这个方法,果真调用了fromHeaders;
this.headerMapper.fromHeaders(headers, reply);
方法内部
Set<Map.Entry<String, Object>> entries = headers.entrySet(); for (Map.Entry<String, Object> entry : entries) { String headerName = entry.getKey(); if (StringUtils.hasText(headerName) && !headerName.startsWith(JmsHeaders.PREFIX)) { Object value = entry.getValue(); if (value != null && SUPPORTED_PROPERTY_TYPES.contains(value.getClass())) { try { String propertyName = this.fromHeaderName(headerName); jmsMessage.setObjectProperty(propertyName, value); }
一个循环将headers信息存入了message中。
嗯,不想跟了,这里其实还是很不清楚。但是我已经知道map里面存放的返回队列信息在headers 里面的key值是jms_replyTo了…
乐享:知识积累,快乐无限。