因需要配置多个MQ ,并且采用spring自动注入方式有冲突是,决定手动加载MQ。

当然还有一些问题未解决,比如采用自动注入Envirement是env获取为空。所以采用了懒加载的方式。

查看了JMS的部分源码,进行改写,适配自己特定环境下的使用。

好处是,按原jms及pool的配置属性 进行配置就好了,只增加了一层别名信息。配置属性不明确时,可以用下sping的提示,看看有哪些配置属性。


//入口

BDMPActiveMQFactory.java

//ActiveMQ配置信息类

BDMPActiveMQProperties.java

//初始化

BDMPActiveMQConnectionFactoryConfiguration.java

//工厂

BDMPActiveMQConnectionFactoryFactory.java


public class BDMPActiveMQFactory {
private static BDMPActiveMQFactory instance;
static Map<String ,JmsMessagingTemplate> jmsMessagingTemplateMap = new ConcurrentHashMap<String ,JmsMessagingTemplate>();
static Map<String ,JmsTemplate> jmsTemplateMap = new ConcurrentHashMap<String ,JmsTemplate>();
static Map<String ,ActiveMQConnectionFactory> activeMQConnectionFactoryMap = new ConcurrentHashMap<String ,ActiveMQConnectionFactory>();
static Map<String ,JmsPoolConnectionFactory > jmsPoolConnectionFactoryMap = new ConcurrentHashMap<String ,JmsPoolConnectionFactory >();
private static Environment env;
public static BDMPActiveMQFactory getInstance() {
if(instance==null) {
synchronized (BDMPActiveMQFactory.class) {
instance = new BDMPActiveMQFactory();
if(env==null) {
env = SpringBeanUtil.getBean(Environment.class);
}
instance.init();
}
}
return instance;
}
private BDMPActiveMQFactory(){
}
private void init() {
Map<String ,BDMPActiveMQProperties> proMap = new ConcurrentHashMap<String, BDMPActiveMQProperties>();
//初始化配置信息
String activemqs = env.getProperty("activemq.names","");
if(!StringUtils.isEmpty(activemqs)) {
String[] names = activemqs.split(",");
for (int i = 0; i < names.length; i++) {
BDMPActiveMQProperties properties = new BDMPActiveMQProperties();
String brokerUrl = env.getProperty("activemq."+names[i]+".broker-url");
if (!StringUtils.isEmpty(brokerUrl)) {
properties.setBrokerUrl(brokerUrl);
}
properties.setUser(env.getProperty("activemq."+names[i]+".user",""));
properties.setPassword(env.getProperty("activemq."+names[i]+".password",""));
String sendTimeout = env.getProperty("activemq."+names[i]+".send-timeout","");
if(!StringUtils.isEmpty(sendTimeout)) {
try {
properties.setSendTimeout(Duration.ofMillis(Long.parseLong(sendTimeout)));
} catch (Exception e) {
e.printStackTrace();
}
}
String nonBlockingRedelivery = env.getProperty("activemq."+names[i]+".non-blocking-redelivery");
if(nonBlockingRedelivery!=null) {
properties.setNonBlockingRedelivery(Boolean.getBoolean(nonBlockingRedelivery));
}
String closeTimeout = env.getProperty("activemq."+names[i]+".close-timeout","");
if(!StringUtils.isEmpty(closeTimeout)) {
try {
properties.setCloseTimeout(Duration.ofMillis(Long.parseLong(closeTimeout)));
} catch (Exception e) {
e.printStackTrace();
}
}
String inMemory = env.getProperty("activemq."+names[i]+".in-memory");
if(inMemory!=null) {
properties.setInMemory(Boolean.getBoolean(inMemory));
}
String trustAll = env.getProperty("activemq."+names[i]+".packages.trust-all");
if(trustAll!=null) {
properties.getPackages().setTrustAll(Boolean.getBoolean(trustAll));
}
List<String> trusted = env.getProperty("activemq."+names[i]+".packages.trusted", ArrayList.class);
if(trusted!=null && trusted.size()>0) {
properties.getPackages().setTrusted(trusted);
}
String poolenabled =  env.getProperty("activemq."+names[i]+".pool.enabled");
if(poolenabled!=null && Boolean.getBoolean(poolenabled)) {
properties.getPool().setEnabled(true);
String maxConnections =  env.getProperty("activemq."+names[i]+".pool.max-connections"); 
if(!StringUtils.isEmpty(maxConnections)) {
properties.getPool().setMaxConnections(Integer.parseInt(maxConnections));
}
String idleTimeout =  env.getProperty("activemq."+names[i]+".pool.idle-timeout"); 
if(!StringUtils.isEmpty(idleTimeout)) {
properties.getPool().setIdleTimeout(Duration.ofSeconds(Long.parseLong(idleTimeout)));
}
properties.getPool().setBlockIfFull(Boolean.getBoolean(env.getProperty("activemq."+names[i]+".pool.block-if-full")));
String blockIfFullTimeout =  env.getProperty("activemq."+names[i]+".pool.block-if-full-timeout"); 
if(!StringUtils.isEmpty(blockIfFullTimeout)) {
properties.getPool().setBlockIfFullTimeout(Duration.ofMillis(Long.parseLong(blockIfFullTimeout)));
}
String maxSessionsPerConnection =  env.getProperty("activemq."+names[i]+".pool.max-sessions-per-connection"); 
if(!StringUtils.isEmpty(maxSessionsPerConnection)) {
properties.getPool().setMaxSessionsPerConnection(Integer.parseInt(maxSessionsPerConnection));
}
String timeBetweenExpirationCheck =  env.getProperty("activemq."+names[i]+".pool.time-between-expiration-check"); 
if(!StringUtils.isEmpty(timeBetweenExpirationCheck)) {
properties.getPool().setTimeBetweenExpirationCheck(Duration.ofMillis(Long.parseLong(timeBetweenExpirationCheck)));
}
String useAnonymousProducers = env.getProperty("activemq."+names[i]+".pool.use-anonymous-producers");
if(!StringUtils.isEmpty(useAnonymousProducers)) {
properties.getPool().setUseAnonymousProducers(Boolean.getBoolean(useAnonymousProducers));
}
}
proMap.put(names[i], properties);
}
}
BDMPActiveMQConnectionFactoryConfiguration cofig = new BDMPActiveMQConnectionFactoryConfiguration();
for (Map.Entry<String, BDMPActiveMQProperties> entry : proMap.entrySet()) {    
ConnectionFactory connectionFactory = null;
BDMPActiveMQProperties pro = entry.getValue();
if(pro.getPool().isEnabled()) {
connectionFactory = cofig.pooledJmsConnectionFactory(pro, null,activeMQConnectionFactoryMap,entry.getKey());
jmsPoolConnectionFactoryMap.put(entry.getKey(), (JmsPoolConnectionFactory) connectionFactory);
}else {
connectionFactory = cofig.jmsConnectionFactory(pro, null);
activeMQConnectionFactoryMap.put(entry.getKey(), (ActiveMQConnectionFactory) connectionFactory);
}
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(jmsTemplate);
jmsTemplateMap.put(entry.getKey(), jmsTemplate);
jmsMessagingTemplateMap.put(entry.getKey(), jmsMessagingTemplate);
}
}
public JmsMessagingTemplate getJmsMessagingTemplateByName(String name) {
return jmsMessagingTemplateMap.get(name);
}
public JmsTemplate getJmsTemplateByName(String name) {
return jmsTemplateMap.get(name);
}
public Map<String ,JmsTemplate> getJmsTemplateMap() {
return jmsTemplateMap;
}
public Map<String ,ActiveMQConnectionFactory> getActiveMQConnectionFactoryMap() {
return activeMQConnectionFactoryMap;
}
public ActiveMQConnectionFactory getActiveMQConnectionFactoryByName(String name) {
return activeMQConnectionFactoryMap.get(name);
}
}
class BDMPActiveMQProperties {
/**
 * URL of the ActiveMQ broker. Auto-generated by default.
 */
private String brokerUrl;
/**
 * Whether the default broker URL should be in memory. Ignored if an explicit broker
 * has been specified.
 */
private boolean inMemory = true;
/**
 * Login user of the broker.
 */
private String user;
/**
 * Login password of the broker.
 */
private String password;
/**
 * Time to wait before considering a close complete.
 */
private Duration closeTimeout = Duration.ofSeconds(15);
/**
 * Whether to stop message delivery before re-delivering messages from a rolled back
 * transaction. This implies that message order is not preserved when this is enabled.
 */
private boolean nonBlockingRedelivery = false;
/**
 * Time to wait on message sends for a response. Set it to 0 to wait forever.
 */
private Duration sendTimeout = Duration.ofMillis(0);
private JmsPoolConnectionFactoryProperties pool = new JmsPoolConnectionFactoryProperties();
private Packages packages = new Packages();
public String getBrokerUrl() {
return this.brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public boolean isInMemory() {
return this.inMemory;
}
public void setInMemory(boolean inMemory) {
this.inMemory = inMemory;
}
public String getUser() {
return this.user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return this.password;
}
public void setPassword(String password) {
this.password = password;
}
public Duration getCloseTimeout() {
return this.closeTimeout;
}
public void setCloseTimeout(Duration closeTimeout) {
this.closeTimeout = closeTimeout;
}
public boolean isNonBlockingRedelivery() {
return this.nonBlockingRedelivery;
}
public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
this.nonBlockingRedelivery = nonBlockingRedelivery;
}
public Duration getSendTimeout() {
return this.sendTimeout;
}
public void setSendTimeout(Duration sendTimeout) {
this.sendTimeout = sendTimeout;
}
JmsPoolConnectionFactoryProperties getPool() {
return this.pool;
}
Packages getPackages() {
return this.packages;
}
public static class Packages {
/**
 * Whether to trust all packages.
 */
private Boolean trustAll;
/**
 * Comma-separated list of specific packages to trust (when not trusting all
 * packages).
 */
private List<String> trusted = new ArrayList<>();
public Boolean getTrustAll() {
return this.trustAll;
}
public void setTrustAll(Boolean trustAll) {
this.trustAll = trustAll;
}
public List<String> getTrusted() {
return this.trusted;
}
public void setTrusted(List<String> trusted) {
this.trusted = trusted;
}
}
}
class BDMPActiveMQConnectionFactoryConfiguration {
ActiveMQConnectionFactory jmsConnectionFactory(BDMPActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return createJmsConnectionFactory(properties, factoryCustomizers);
}
JmsPoolConnectionFactory pooledJmsConnectionFactory(BDMPActiveMQProperties pro,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers,Map factoryMap,String key) {
ActiveMQConnectionFactory connectionFactory = new BDMPActiveMQConnectionFactoryFactory(pro,
factoryCustomizers.orderedStream().collect(Collectors.toList()))
.createConnectionFactory(ActiveMQConnectionFactory.class);
factoryMap.put(key, connectionFactory);
return new JmsPoolConnectionFactoryFactory(pro.getPool())
.createPooledConnectionFactory(connectionFactory);
}
JmsPoolConnectionFactory pooledJmsConnectionFactory(BDMPActiveMQProperties pro,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
ActiveMQConnectionFactory connectionFactory = new BDMPActiveMQConnectionFactoryFactory(pro,
factoryCustomizers.orderedStream().collect(Collectors.toList()))
.createConnectionFactory(ActiveMQConnectionFactory.class);
return new JmsPoolConnectionFactoryFactory(pro.getPool())
.createPooledConnectionFactory(connectionFactory);
}
private static ActiveMQConnectionFactory createJmsConnectionFactory(BDMPActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return new BDMPActiveMQConnectionFactoryFactory(properties,
factoryCustomizers == null?null:factoryCustomizers.orderedStream().collect(Collectors.toList()))
.createConnectionFactory(ActiveMQConnectionFactory.class);
}
}
class BDMPActiveMQConnectionFactoryFactory {
private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false";
private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616";
private final BDMPActiveMQProperties properties;
private final List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers;
BDMPActiveMQConnectionFactoryFactory(BDMPActiveMQProperties pro,
List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
Assert.notNull(pro, "Properties must not be null");
this.properties = pro;
this.factoryCustomizers = (factoryCustomizers != null) ? factoryCustomizers : Collections.emptyList();
}
<T extends ActiveMQConnectionFactory> T createConnectionFactory(Class<T> factoryClass) {
try {
return doCreateConnectionFactory(factoryClass);
}
catch (Exception ex) {
throw new IllegalStateException("Unable to create ActiveMQConnectionFactory", ex);
}
}
private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(Class<T> factoryClass) throws Exception {
T factory = createConnectionFactoryInstance(factoryClass);
if (this.properties.getCloseTimeout() != null) {
factory.setCloseTimeout((int) this.properties.getCloseTimeout().toMillis());
}
factory.setNonBlockingRedelivery(this.properties.isNonBlockingRedelivery());
if (this.properties.getSendTimeout() != null) {
factory.setSendTimeout((int) this.properties.getSendTimeout().toMillis());
}
Packages packages = this.properties.getPackages();
if (packages.getTrustAll() != null) {
factory.setTrustAllPackages(packages.getTrustAll());
}
if (!packages.getTrusted().isEmpty()) {
factory.setTrustedPackages(packages.getTrusted());
}
customize(factory);
return factory;
}
private <T extends ActiveMQConnectionFactory> T createConnectionFactoryInstance(Class<T> factoryClass)
throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
String brokerUrl = determineBrokerUrl();
String user = this.properties.getUser();
String password = this.properties.getPassword();
if (StringUtils.hasLength(user) && StringUtils.hasLength(password)) {
return factoryClass.getConstructor(String.class, String.class, String.class).newInstance(user, password,
brokerUrl);
}
return factoryClass.getConstructor(String.class).newInstance(brokerUrl);
}
private void customize(ActiveMQConnectionFactory connectionFactory) {
for (ActiveMQConnectionFactoryCustomizer factoryCustomizer : this.factoryCustomizers) {
factoryCustomizer.customize(connectionFactory);
}
}
String determineBrokerUrl() {
if (this.properties.getBrokerUrl() != null) {
return this.properties.getBrokerUrl();
}
if (this.properties.isInMemory()) {
return DEFAULT_EMBEDDED_BROKER_URL;
}
return DEFAULT_NETWORK_BROKER_URL;
}
}

配置文件

activemq: 
  names: dx,wt,sx
  dx: 
    broker-url: 
    user: 1
    password: 1
    pool:
      max-connections: 5
      enabled: true
      idle-timeout: 3000
  wt:
    broker-url:  tcp://localhost:61616
    user: 1
    password: 1
    pool:
      max-connections: 5
      enabled: true
      idle-timeout: 3000
  sx:
    broker-url:  tcp://localhost:61616
    user: 1
    password: 1
    pool:
      max-connections: 5
      enabled: true
      idle-timeout: 3000




乐享:知识积累,快乐无限。