因需要配置多个MQ ,并且采用spring自动注入方式有冲突是,决定手动加载MQ。
当然还有一些问题未解决,比如采用自动注入Envirement是env获取为空。所以采用了懒加载的方式。
查看了JMS的部分源码,进行改写,适配自己特定环境下的使用。
好处是,按原jms及pool的配置属性 进行配置就好了,只增加了一层别名信息。配置属性不明确时,可以用下sping的提示,看看有哪些配置属性。
//入口
BDMPActiveMQFactory.java
//ActiveMQ配置信息类
BDMPActiveMQProperties.java
//初始化
BDMPActiveMQConnectionFactoryConfiguration.java
//工厂
BDMPActiveMQConnectionFactoryFactory.java
public class BDMPActiveMQFactory { private static BDMPActiveMQFactory instance; static Map<String ,JmsMessagingTemplate> jmsMessagingTemplateMap = new ConcurrentHashMap<String ,JmsMessagingTemplate>(); static Map<String ,JmsTemplate> jmsTemplateMap = new ConcurrentHashMap<String ,JmsTemplate>(); static Map<String ,ActiveMQConnectionFactory> activeMQConnectionFactoryMap = new ConcurrentHashMap<String ,ActiveMQConnectionFactory>(); static Map<String ,JmsPoolConnectionFactory > jmsPoolConnectionFactoryMap = new ConcurrentHashMap<String ,JmsPoolConnectionFactory >(); private static Environment env; public static BDMPActiveMQFactory getInstance() { if(instance==null) { synchronized (BDMPActiveMQFactory.class) { instance = new BDMPActiveMQFactory(); if(env==null) { env = SpringBeanUtil.getBean(Environment.class); } instance.init(); } } return instance; } private BDMPActiveMQFactory(){ } private void init() { Map<String ,BDMPActiveMQProperties> proMap = new ConcurrentHashMap<String, BDMPActiveMQProperties>(); //初始化配置信息 String activemqs = env.getProperty("activemq.names",""); if(!StringUtils.isEmpty(activemqs)) { String[] names = activemqs.split(","); for (int i = 0; i < names.length; i++) { BDMPActiveMQProperties properties = new BDMPActiveMQProperties(); String brokerUrl = env.getProperty("activemq."+names[i]+".broker-url"); if (!StringUtils.isEmpty(brokerUrl)) { properties.setBrokerUrl(brokerUrl); } properties.setUser(env.getProperty("activemq."+names[i]+".user","")); properties.setPassword(env.getProperty("activemq."+names[i]+".password","")); String sendTimeout = env.getProperty("activemq."+names[i]+".send-timeout",""); if(!StringUtils.isEmpty(sendTimeout)) { try { properties.setSendTimeout(Duration.ofMillis(Long.parseLong(sendTimeout))); } catch (Exception e) { e.printStackTrace(); } } String nonBlockingRedelivery = env.getProperty("activemq."+names[i]+".non-blocking-redelivery"); if(nonBlockingRedelivery!=null) { properties.setNonBlockingRedelivery(Boolean.getBoolean(nonBlockingRedelivery)); } String closeTimeout = env.getProperty("activemq."+names[i]+".close-timeout",""); if(!StringUtils.isEmpty(closeTimeout)) { try { properties.setCloseTimeout(Duration.ofMillis(Long.parseLong(closeTimeout))); } catch (Exception e) { e.printStackTrace(); } } String inMemory = env.getProperty("activemq."+names[i]+".in-memory"); if(inMemory!=null) { properties.setInMemory(Boolean.getBoolean(inMemory)); } String trustAll = env.getProperty("activemq."+names[i]+".packages.trust-all"); if(trustAll!=null) { properties.getPackages().setTrustAll(Boolean.getBoolean(trustAll)); } List<String> trusted = env.getProperty("activemq."+names[i]+".packages.trusted", ArrayList.class); if(trusted!=null && trusted.size()>0) { properties.getPackages().setTrusted(trusted); } String poolenabled = env.getProperty("activemq."+names[i]+".pool.enabled"); if(poolenabled!=null && Boolean.getBoolean(poolenabled)) { properties.getPool().setEnabled(true); String maxConnections = env.getProperty("activemq."+names[i]+".pool.max-connections"); if(!StringUtils.isEmpty(maxConnections)) { properties.getPool().setMaxConnections(Integer.parseInt(maxConnections)); } String idleTimeout = env.getProperty("activemq."+names[i]+".pool.idle-timeout"); if(!StringUtils.isEmpty(idleTimeout)) { properties.getPool().setIdleTimeout(Duration.ofSeconds(Long.parseLong(idleTimeout))); } properties.getPool().setBlockIfFull(Boolean.getBoolean(env.getProperty("activemq."+names[i]+".pool.block-if-full"))); String blockIfFullTimeout = env.getProperty("activemq."+names[i]+".pool.block-if-full-timeout"); if(!StringUtils.isEmpty(blockIfFullTimeout)) { properties.getPool().setBlockIfFullTimeout(Duration.ofMillis(Long.parseLong(blockIfFullTimeout))); } String maxSessionsPerConnection = env.getProperty("activemq."+names[i]+".pool.max-sessions-per-connection"); if(!StringUtils.isEmpty(maxSessionsPerConnection)) { properties.getPool().setMaxSessionsPerConnection(Integer.parseInt(maxSessionsPerConnection)); } String timeBetweenExpirationCheck = env.getProperty("activemq."+names[i]+".pool.time-between-expiration-check"); if(!StringUtils.isEmpty(timeBetweenExpirationCheck)) { properties.getPool().setTimeBetweenExpirationCheck(Duration.ofMillis(Long.parseLong(timeBetweenExpirationCheck))); } String useAnonymousProducers = env.getProperty("activemq."+names[i]+".pool.use-anonymous-producers"); if(!StringUtils.isEmpty(useAnonymousProducers)) { properties.getPool().setUseAnonymousProducers(Boolean.getBoolean(useAnonymousProducers)); } } proMap.put(names[i], properties); } } BDMPActiveMQConnectionFactoryConfiguration cofig = new BDMPActiveMQConnectionFactoryConfiguration(); for (Map.Entry<String, BDMPActiveMQProperties> entry : proMap.entrySet()) { ConnectionFactory connectionFactory = null; BDMPActiveMQProperties pro = entry.getValue(); if(pro.getPool().isEnabled()) { connectionFactory = cofig.pooledJmsConnectionFactory(pro, null,activeMQConnectionFactoryMap,entry.getKey()); jmsPoolConnectionFactoryMap.put(entry.getKey(), (JmsPoolConnectionFactory) connectionFactory); }else { connectionFactory = cofig.jmsConnectionFactory(pro, null); activeMQConnectionFactoryMap.put(entry.getKey(), (ActiveMQConnectionFactory) connectionFactory); } JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(jmsTemplate); jmsTemplateMap.put(entry.getKey(), jmsTemplate); jmsMessagingTemplateMap.put(entry.getKey(), jmsMessagingTemplate); } } public JmsMessagingTemplate getJmsMessagingTemplateByName(String name) { return jmsMessagingTemplateMap.get(name); } public JmsTemplate getJmsTemplateByName(String name) { return jmsTemplateMap.get(name); } public Map<String ,JmsTemplate> getJmsTemplateMap() { return jmsTemplateMap; } public Map<String ,ActiveMQConnectionFactory> getActiveMQConnectionFactoryMap() { return activeMQConnectionFactoryMap; } public ActiveMQConnectionFactory getActiveMQConnectionFactoryByName(String name) { return activeMQConnectionFactoryMap.get(name); } }
class BDMPActiveMQProperties { /** * URL of the ActiveMQ broker. Auto-generated by default. */ private String brokerUrl; /** * Whether the default broker URL should be in memory. Ignored if an explicit broker * has been specified. */ private boolean inMemory = true; /** * Login user of the broker. */ private String user; /** * Login password of the broker. */ private String password; /** * Time to wait before considering a close complete. */ private Duration closeTimeout = Duration.ofSeconds(15); /** * Whether to stop message delivery before re-delivering messages from a rolled back * transaction. This implies that message order is not preserved when this is enabled. */ private boolean nonBlockingRedelivery = false; /** * Time to wait on message sends for a response. Set it to 0 to wait forever. */ private Duration sendTimeout = Duration.ofMillis(0); private JmsPoolConnectionFactoryProperties pool = new JmsPoolConnectionFactoryProperties(); private Packages packages = new Packages(); public String getBrokerUrl() { return this.brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public boolean isInMemory() { return this.inMemory; } public void setInMemory(boolean inMemory) { this.inMemory = inMemory; } public String getUser() { return this.user; } public void setUser(String user) { this.user = user; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public Duration getCloseTimeout() { return this.closeTimeout; } public void setCloseTimeout(Duration closeTimeout) { this.closeTimeout = closeTimeout; } public boolean isNonBlockingRedelivery() { return this.nonBlockingRedelivery; } public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { this.nonBlockingRedelivery = nonBlockingRedelivery; } public Duration getSendTimeout() { return this.sendTimeout; } public void setSendTimeout(Duration sendTimeout) { this.sendTimeout = sendTimeout; } JmsPoolConnectionFactoryProperties getPool() { return this.pool; } Packages getPackages() { return this.packages; } public static class Packages { /** * Whether to trust all packages. */ private Boolean trustAll; /** * Comma-separated list of specific packages to trust (when not trusting all * packages). */ private List<String> trusted = new ArrayList<>(); public Boolean getTrustAll() { return this.trustAll; } public void setTrustAll(Boolean trustAll) { this.trustAll = trustAll; } public List<String> getTrusted() { return this.trusted; } public void setTrusted(List<String> trusted) { this.trusted = trusted; } } }
class BDMPActiveMQConnectionFactoryConfiguration { ActiveMQConnectionFactory jmsConnectionFactory(BDMPActiveMQProperties properties, ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) { return createJmsConnectionFactory(properties, factoryCustomizers); } JmsPoolConnectionFactory pooledJmsConnectionFactory(BDMPActiveMQProperties pro, ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers,Map factoryMap,String key) { ActiveMQConnectionFactory connectionFactory = new BDMPActiveMQConnectionFactoryFactory(pro, factoryCustomizers.orderedStream().collect(Collectors.toList())) .createConnectionFactory(ActiveMQConnectionFactory.class); factoryMap.put(key, connectionFactory); return new JmsPoolConnectionFactoryFactory(pro.getPool()) .createPooledConnectionFactory(connectionFactory); } JmsPoolConnectionFactory pooledJmsConnectionFactory(BDMPActiveMQProperties pro, ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) { ActiveMQConnectionFactory connectionFactory = new BDMPActiveMQConnectionFactoryFactory(pro, factoryCustomizers.orderedStream().collect(Collectors.toList())) .createConnectionFactory(ActiveMQConnectionFactory.class); return new JmsPoolConnectionFactoryFactory(pro.getPool()) .createPooledConnectionFactory(connectionFactory); } private static ActiveMQConnectionFactory createJmsConnectionFactory(BDMPActiveMQProperties properties, ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) { return new BDMPActiveMQConnectionFactoryFactory(properties, factoryCustomizers == null?null:factoryCustomizers.orderedStream().collect(Collectors.toList())) .createConnectionFactory(ActiveMQConnectionFactory.class); } }
class BDMPActiveMQConnectionFactoryFactory { private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false"; private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616"; private final BDMPActiveMQProperties properties; private final List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers; BDMPActiveMQConnectionFactoryFactory(BDMPActiveMQProperties pro, List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) { Assert.notNull(pro, "Properties must not be null"); this.properties = pro; this.factoryCustomizers = (factoryCustomizers != null) ? factoryCustomizers : Collections.emptyList(); } <T extends ActiveMQConnectionFactory> T createConnectionFactory(Class<T> factoryClass) { try { return doCreateConnectionFactory(factoryClass); } catch (Exception ex) { throw new IllegalStateException("Unable to create ActiveMQConnectionFactory", ex); } } private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(Class<T> factoryClass) throws Exception { T factory = createConnectionFactoryInstance(factoryClass); if (this.properties.getCloseTimeout() != null) { factory.setCloseTimeout((int) this.properties.getCloseTimeout().toMillis()); } factory.setNonBlockingRedelivery(this.properties.isNonBlockingRedelivery()); if (this.properties.getSendTimeout() != null) { factory.setSendTimeout((int) this.properties.getSendTimeout().toMillis()); } Packages packages = this.properties.getPackages(); if (packages.getTrustAll() != null) { factory.setTrustAllPackages(packages.getTrustAll()); } if (!packages.getTrusted().isEmpty()) { factory.setTrustedPackages(packages.getTrusted()); } customize(factory); return factory; } private <T extends ActiveMQConnectionFactory> T createConnectionFactoryInstance(Class<T> factoryClass) throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { String brokerUrl = determineBrokerUrl(); String user = this.properties.getUser(); String password = this.properties.getPassword(); if (StringUtils.hasLength(user) && StringUtils.hasLength(password)) { return factoryClass.getConstructor(String.class, String.class, String.class).newInstance(user, password, brokerUrl); } return factoryClass.getConstructor(String.class).newInstance(brokerUrl); } private void customize(ActiveMQConnectionFactory connectionFactory) { for (ActiveMQConnectionFactoryCustomizer factoryCustomizer : this.factoryCustomizers) { factoryCustomizer.customize(connectionFactory); } } String determineBrokerUrl() { if (this.properties.getBrokerUrl() != null) { return this.properties.getBrokerUrl(); } if (this.properties.isInMemory()) { return DEFAULT_EMBEDDED_BROKER_URL; } return DEFAULT_NETWORK_BROKER_URL; } }
配置文件
activemq: names: dx,wt,sx dx: broker-url: user: 1 password: 1 pool: max-connections: 5 enabled: true idle-timeout: 3000 wt: broker-url: tcp://localhost:61616 user: 1 password: 1 pool: max-connections: 5 enabled: true idle-timeout: 3000 sx: broker-url: tcp://localhost:61616 user: 1 password: 1 pool: max-connections: 5 enabled: true idle-timeout: 3000
乐享:知识积累,快乐无限。