001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.File;
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.net.UnknownHostException;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.LinkedBlockingQueue;
034import java.util.concurrent.RejectedExecutionException;
035import java.util.concurrent.RejectedExecutionHandler;
036import java.util.concurrent.SynchronousQueue;
037import java.util.concurrent.ThreadFactory;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041
042import javax.annotation.PostConstruct;
043import javax.annotation.PreDestroy;
044import javax.management.MalformedObjectNameException;
045import javax.management.ObjectName;
046
047import org.apache.activemq.ActiveMQConnectionMetaData;
048import org.apache.activemq.ConfigurationException;
049import org.apache.activemq.Service;
050import org.apache.activemq.advisory.AdvisoryBroker;
051import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
052import org.apache.activemq.broker.ft.MasterConnector;
053import org.apache.activemq.broker.jmx.AnnotatedMBean;
054import org.apache.activemq.broker.jmx.BrokerView;
055import org.apache.activemq.broker.jmx.ConnectorView;
056import org.apache.activemq.broker.jmx.ConnectorViewMBean;
057import org.apache.activemq.broker.jmx.FTConnectorView;
058import org.apache.activemq.broker.jmx.JmsConnectorView;
059import org.apache.activemq.broker.jmx.JobSchedulerView;
060import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
061import org.apache.activemq.broker.jmx.ManagedRegionBroker;
062import org.apache.activemq.broker.jmx.ManagementContext;
063import org.apache.activemq.broker.jmx.NetworkConnectorView;
064import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
065import org.apache.activemq.broker.jmx.ProxyConnectorView;
066import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
067import org.apache.activemq.broker.region.Destination;
068import org.apache.activemq.broker.region.DestinationFactory;
069import org.apache.activemq.broker.region.DestinationFactoryImpl;
070import org.apache.activemq.broker.region.DestinationInterceptor;
071import org.apache.activemq.broker.region.RegionBroker;
072import org.apache.activemq.broker.region.policy.PolicyMap;
073import org.apache.activemq.broker.region.virtual.MirroredQueue;
074import org.apache.activemq.broker.region.virtual.VirtualDestination;
075import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
076import org.apache.activemq.broker.region.virtual.VirtualTopic;
077import org.apache.activemq.broker.scheduler.SchedulerBroker;
078import org.apache.activemq.command.ActiveMQDestination;
079import org.apache.activemq.command.ActiveMQQueue;
080import org.apache.activemq.command.BrokerId;
081import org.apache.activemq.filter.DestinationFilter;
082import org.apache.activemq.network.ConnectionFilter;
083import org.apache.activemq.network.DiscoveryNetworkConnector;
084import org.apache.activemq.network.NetworkConnector;
085import org.apache.activemq.network.jms.JmsConnector;
086import org.apache.activemq.proxy.ProxyConnector;
087import org.apache.activemq.security.MessageAuthorizationPolicy;
088import org.apache.activemq.selector.SelectorParser;
089import org.apache.activemq.store.PersistenceAdapter;
090import org.apache.activemq.store.PersistenceAdapterFactory;
091import org.apache.activemq.store.amq.AMQPersistenceAdapter;
092import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
093import org.apache.activemq.store.kahadb.plist.PListStore;
094import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
095import org.apache.activemq.thread.Scheduler;
096import org.apache.activemq.thread.TaskRunnerFactory;
097import org.apache.activemq.transport.TransportFactory;
098import org.apache.activemq.transport.TransportServer;
099import org.apache.activemq.transport.vm.VMTransportFactory;
100import org.apache.activemq.usage.SystemUsage;
101import org.apache.activemq.util.BrokerSupport;
102import org.apache.activemq.util.DefaultIOExceptionHandler;
103import org.apache.activemq.util.IOExceptionHandler;
104import org.apache.activemq.util.IOExceptionSupport;
105import org.apache.activemq.util.IOHelper;
106import org.apache.activemq.util.InetAddressUtil;
107import org.apache.activemq.util.JMXSupport;
108import org.apache.activemq.util.ServiceStopper;
109import org.apache.activemq.util.URISupport;
110import org.slf4j.Logger;
111import org.slf4j.LoggerFactory;
112import org.slf4j.MDC;
113
114/**
115 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
116 * number of transport connectors, network connectors and a bunch of properties
117 * which can be used to configure the broker as its lazily created.
118 *
119 *
120 * @org.apache.xbean.XBean
121 */
122public class BrokerService implements Service {
123    protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
124    public static final String DEFAULT_PORT = "61616";
125    public static final String LOCAL_HOST_NAME;
126    public static final String DEFAULT_BROKER_NAME = "localhost";
127    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
128    private static final long serialVersionUID = 7353129142305630237L;
129    private boolean useJmx = true;
130    private boolean enableStatistics = true;
131    private boolean persistent = true;
132    private boolean populateJMSXUserID;
133    private boolean useAuthenticatedPrincipalForJMSXUserID;
134    private boolean populateUserNameInMBeans;
135
136    private boolean useShutdownHook = true;
137    private boolean useLoggingForShutdownErrors;
138    private boolean shutdownOnMasterFailure;
139    private boolean shutdownOnSlaveFailure;
140    private boolean waitForSlave;
141    private long waitForSlaveTimeout = 600000L;
142    private boolean passiveSlave;
143    private String brokerName = DEFAULT_BROKER_NAME;
144    private File dataDirectoryFile;
145    private File tmpDataDirectory;
146    private Broker broker;
147    private BrokerView adminView;
148    private ManagementContext managementContext;
149    private ObjectName brokerObjectName;
150    private TaskRunnerFactory taskRunnerFactory;
151    private TaskRunnerFactory persistenceTaskRunnerFactory;
152    private SystemUsage systemUsage;
153    private SystemUsage producerSystemUsage;
154    private SystemUsage consumerSystemUsaage;
155    private PersistenceAdapter persistenceAdapter;
156    private PersistenceAdapterFactory persistenceFactory;
157    protected DestinationFactory destinationFactory;
158    private MessageAuthorizationPolicy messageAuthorizationPolicy;
159    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
160    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
161    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
162    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
163    private final List<Service> services = new ArrayList<Service>();
164    private MasterConnector masterConnector;
165    private String masterConnectorURI;
166    private transient Thread shutdownHook;
167    private String[] transportConnectorURIs;
168    private String[] networkConnectorURIs;
169    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
170    // to other jms messaging
171    // systems
172    private boolean deleteAllMessagesOnStartup;
173    private boolean advisorySupport = true;
174    private URI vmConnectorURI;
175    private String defaultSocketURIString;
176    private PolicyMap destinationPolicy;
177    private final AtomicBoolean started = new AtomicBoolean(false);
178    private final AtomicBoolean stopped = new AtomicBoolean(false);
179    private BrokerPlugin[] plugins;
180    private boolean keepDurableSubsActive = true;
181    private boolean useVirtualTopics = true;
182    private boolean useMirroredQueues = false;
183    private boolean useTempMirroredQueues = true;
184    private BrokerId brokerId;
185    private DestinationInterceptor[] destinationInterceptors;
186    private ActiveMQDestination[] destinations;
187    private PListStore tempDataStore;
188    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
189    private boolean useLocalHostBrokerName;
190    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
191    private final CountDownLatch startedLatch = new CountDownLatch(1);
192    private boolean supportFailOver;
193    private Broker regionBroker;
194    private int producerSystemUsagePortion = 60;
195    private int consumerSystemUsagePortion = 40;
196    private boolean splitSystemUsageForProducersConsumers;
197    private boolean monitorConnectionSplits = false;
198    private int taskRunnerPriority = Thread.NORM_PRIORITY;
199    private boolean dedicatedTaskRunner;
200    private boolean cacheTempDestinations = false;// useful for failover
201    private int timeBeforePurgeTempDestinations = 5000;
202    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
203    private boolean systemExitOnShutdown;
204    private int systemExitOnShutdownExitCode;
205    private SslContext sslContext;
206    private boolean forceStart = false;
207    private IOExceptionHandler ioExceptionHandler;
208    private boolean schedulerSupport = false;
209    private File schedulerDirectoryFile;
210    private Scheduler scheduler;
211    private ThreadPoolExecutor executor;
212    private boolean slave = true;
213    private int schedulePeriodForDestinationPurge= 0;
214    private int maxPurgedDestinationsPerSweep = 0;
215    private BrokerContext brokerContext;
216    private boolean networkConnectorStartAsync = false;
217    private boolean allowTempAutoCreationOnSend;
218
219    private int offlineDurableSubscriberTimeout = -1;
220    private int offlineDurableSubscriberTaskSchedule = 300000;
221    private DestinationFilter virtualConsumerDestinationFilter;
222
223    static {
224        String localHostName = "localhost";
225        try {
226            localHostName =  InetAddressUtil.getLocalHostName();
227        } catch (UnknownHostException e) {
228            LOG.error("Failed to resolve localhost");
229        }
230        LOCAL_HOST_NAME = localHostName;
231    }
232
233    @Override
234    public String toString() {
235        return "BrokerService[" + getBrokerName() + "]";
236    }
237
238    /**
239     * Adds a new transport connector for the given bind address
240     *
241     * @return the newly created and added transport connector
242     * @throws Exception
243     */
244    public TransportConnector addConnector(String bindAddress) throws Exception {
245        return addConnector(new URI(bindAddress));
246    }
247
248    /**
249     * Adds a new transport connector for the given bind address
250     *
251     * @return the newly created and added transport connector
252     * @throws Exception
253     */
254    public TransportConnector addConnector(URI bindAddress) throws Exception {
255        return addConnector(createTransportConnector(bindAddress));
256    }
257
258    /**
259     * Adds a new transport connector for the given TransportServer transport
260     *
261     * @return the newly created and added transport connector
262     * @throws Exception
263     */
264    public TransportConnector addConnector(TransportServer transport) throws Exception {
265        return addConnector(new TransportConnector(transport));
266    }
267
268    /**
269     * Adds a new transport connector
270     *
271     * @return the transport connector
272     * @throws Exception
273     */
274    public TransportConnector addConnector(TransportConnector connector) throws Exception {
275        transportConnectors.add(connector);
276        return connector;
277    }
278
279    /**
280     * Stops and removes a transport connector from the broker.
281     *
282     * @param connector
283     * @return true if the connector has been previously added to the broker
284     * @throws Exception
285     */
286    public boolean removeConnector(TransportConnector connector) throws Exception {
287        boolean rc = transportConnectors.remove(connector);
288        if (rc) {
289            unregisterConnectorMBean(connector);
290        }
291        return rc;
292    }
293
294    /**
295     * Adds a new network connector using the given discovery address
296     *
297     * @return the newly created and added network connector
298     * @throws Exception
299     */
300    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
301        return addNetworkConnector(new URI(discoveryAddress));
302    }
303
304    /**
305     * Adds a new proxy connector using the given bind address
306     *
307     * @return the newly created and added network connector
308     * @throws Exception
309     */
310    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
311        return addProxyConnector(new URI(bindAddress));
312    }
313
314    /**
315     * Adds a new network connector using the given discovery address
316     *
317     * @return the newly created and added network connector
318     * @throws Exception
319     */
320    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
321        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
322        return addNetworkConnector(connector);
323    }
324
325    /**
326     * Adds a new proxy connector using the given bind address
327     *
328     * @return the newly created and added network connector
329     * @throws Exception
330     */
331    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
332        ProxyConnector connector = new ProxyConnector();
333        connector.setBind(bindAddress);
334        connector.setRemote(new URI("fanout:multicast://default"));
335        return addProxyConnector(connector);
336    }
337
338    /**
339     * Adds a new network connector to connect this broker to a federated
340     * network
341     */
342    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
343        connector.setBrokerService(this);
344        URI uri = getVmConnectorURI();
345        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
346        map.put("network", "true");
347        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
348        connector.setLocalUri(uri);
349        // Set a connection filter so that the connector does not establish loop
350        // back connections.
351        connector.setConnectionFilter(new ConnectionFilter() {
352            public boolean connectTo(URI location) {
353                List<TransportConnector> transportConnectors = getTransportConnectors();
354                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
355                    try {
356                        TransportConnector tc = iter.next();
357                        if (location.equals(tc.getConnectUri())) {
358                            return false;
359                        }
360                    } catch (Throwable e) {
361                    }
362                }
363                return true;
364            }
365        });
366        networkConnectors.add(connector);
367        if (isUseJmx()) {
368            registerNetworkConnectorMBean(connector);
369        }
370        return connector;
371    }
372
373    /**
374     * Removes the given network connector without stopping it. The caller
375     * should call {@link NetworkConnector#stop()} to close the connector
376     */
377    public boolean removeNetworkConnector(NetworkConnector connector) {
378        boolean answer = networkConnectors.remove(connector);
379        if (answer) {
380            unregisterNetworkConnectorMBean(connector);
381        }
382        return answer;
383    }
384
385    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
386        URI uri = getVmConnectorURI();
387        connector.setLocalUri(uri);
388        proxyConnectors.add(connector);
389        if (isUseJmx()) {
390            registerProxyConnectorMBean(connector);
391        }
392        return connector;
393    }
394
395    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
396        connector.setBrokerService(this);
397        jmsConnectors.add(connector);
398        if (isUseJmx()) {
399            registerJmsConnectorMBean(connector);
400        }
401        return connector;
402    }
403
404    public JmsConnector removeJmsConnector(JmsConnector connector) {
405        if (jmsConnectors.remove(connector)) {
406            return connector;
407        }
408        return null;
409    }
410
411    /**
412     * @return Returns the masterConnectorURI.
413     */
414    public String getMasterConnectorURI() {
415        return masterConnectorURI;
416    }
417
418    /**
419     * @param masterConnectorURI
420     *            The masterConnectorURI to set.
421     */
422    public void setMasterConnectorURI(String masterConnectorURI) {
423        this.masterConnectorURI = masterConnectorURI;
424    }
425
426    /**
427     * @return true if this Broker is a slave to a Master
428     */
429    public boolean isSlave() {
430        return (masterConnector != null && masterConnector.isSlave()) ||
431            (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
432            (masterConnector == null && slave);
433    }
434
435    public void masterFailed() {
436        if (shutdownOnMasterFailure) {
437            LOG.error("The Master has failed ... shutting down");
438            try {
439                stop();
440            } catch (Exception e) {
441                LOG.error("Failed to stop for master failure", e);
442            }
443        } else {
444            LOG.warn("Master Failed - starting all connectors");
445            try {
446                startAllConnectors();
447                broker.nowMasterBroker();
448            } catch (Exception e) {
449                LOG.error("Failed to startAllConnectors", e);
450            }
451        }
452    }
453
454    public boolean isStarted() {
455        return started.get();
456    }
457
458    /**
459     * Forces a start of the broker.
460     * By default a BrokerService instance that was
461     * previously stopped using BrokerService.stop() cannot be restarted
462     * using BrokerService.start().
463     * This method enforces a restart.
464     * It is not recommended to force a restart of the broker and will not work
465     * for most but some very trivial broker configurations.
466     * For restarting a broker instance we recommend to first call stop() on
467     * the old instance and then recreate a new BrokerService instance.
468     *
469     * @param force - if true enforces a restart.
470     * @throws Exception
471     */
472    public void start(boolean force) throws Exception {
473        forceStart = force;
474        stopped.set(false);
475        started.set(false);
476        start();
477    }
478
479    // Service interface
480    // -------------------------------------------------------------------------
481
482    protected boolean shouldAutostart() {
483        return true;
484    }
485
486    /**
487     *
488     * @throws Exception
489     * @org. apache.xbean.InitMethod
490     */
491    @PostConstruct
492    public void autoStart() throws Exception {
493        if(shouldAutostart()) {
494            start();
495        }
496    }
497
498    public void start() throws Exception {
499        if (stopped.get() || !started.compareAndSet(false, true)) {
500            // lets just ignore redundant start() calls
501            // as its way too easy to not be completely sure if start() has been
502            // called or not with the gazillion of different configuration
503            // mechanisms
504            // throw new IllegalStateException("Allready started.");
505            return;
506        }
507
508        MDC.put("activemq.broker", brokerName);
509
510        try {
511            if (systemExitOnShutdown && useShutdownHook) {
512                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
513            }
514            processHelperProperties();
515            if (isUseJmx()) {
516                startManagementContext();
517            }
518
519            getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
520            getPersistenceAdapter().setBrokerName(getBrokerName());
521            LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
522            if (deleteAllMessagesOnStartup) {
523                deleteAllMessages();
524            }
525            getPersistenceAdapter().start();
526            slave = false;
527            startDestinations();
528            addShutdownHook();
529            getBroker().start();
530            if (isUseJmx()) {
531                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
532                    // try to restart management context
533                    // typical for slaves that use the same ports as master
534                    managementContext.stop();
535                    startManagementContext();
536                }
537                ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
538                managedBroker.setContextBroker(broker);
539                adminView.setBroker(managedBroker);
540            }
541            BrokerRegistry.getInstance().bind(getBrokerName(), this);
542            // see if there is a MasterBroker service and if so, configure
543            // it and start it.
544            for (Service service : services) {
545                if (service instanceof MasterConnector) {
546                    configureService(service);
547                    service.start();
548                }
549            }
550            if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
551                startAllConnectors();
552            }
553            if (!stopped.get()) {
554                if (isUseJmx() && masterConnector != null) {
555                    registerFTConnectorMBean(masterConnector);
556                }
557            }
558            if (brokerId == null) {
559                brokerId = broker.getBrokerId();
560            }
561            if (ioExceptionHandler == null) {
562                setIoExceptionHandler(new DefaultIOExceptionHandler());
563            }
564            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
565            getBroker().brokerServiceStarted();
566            checkSystemUsageLimits();
567            startedLatch.countDown();
568        } catch (Exception e) {
569            LOG.error("Failed to start ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
570            try {
571                if (!stopped.get()) {
572                    stop();
573                }
574            } catch (Exception ex) {
575                LOG.warn("Failed to stop broker after failure in start ", ex);
576            }
577            throw e;
578        } finally {
579            MDC.remove("activemq.broker");
580        }
581    }
582
583    /**
584     *
585     * @throws Exception
586     * @org.apache .xbean.DestroyMethod
587     */
588    @PreDestroy
589    public void stop() throws Exception {
590        if (!started.get()) {
591            return;
592        }
593
594        MDC.put("activemq.broker", brokerName);
595
596        if (systemExitOnShutdown) {
597            new Thread() {
598                @Override
599                public void run() {
600                    System.exit(systemExitOnShutdownExitCode);
601                }
602            }.start();
603        }
604
605        LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
606        removeShutdownHook();
607        if (this.scheduler != null) {
608            this.scheduler.stop();
609            this.scheduler = null;
610        }
611        ServiceStopper stopper = new ServiceStopper();
612        if (services != null) {
613            for (Service service : services) {
614                stopper.stop(service);
615            }
616        }
617        stopAllConnectors(stopper);
618        // remove any VMTransports connected
619        // this has to be done after services are stopped,
620        // to avoid timimg issue with discovery (spinning up a new instance)
621        BrokerRegistry.getInstance().unbind(getBrokerName());
622        VMTransportFactory.stopped(getBrokerName());
623        if (broker != null) {
624            stopper.stop(broker);
625            broker = null;
626        }
627
628        if (tempDataStore != null) {
629            tempDataStore.stop();
630            tempDataStore = null;
631        }
632        try {
633            stopper.stop(persistenceAdapter);
634            persistenceAdapter = null;
635            slave = true;
636            if (isUseJmx()) {
637                stopper.stop(getManagementContext());
638                managementContext = null;
639            }
640            // Clear SelectorParser cache to free memory
641            SelectorParser.clearCache();
642        } finally {
643            stopped.set(true);
644            stoppedLatch.countDown();
645        }
646        if (masterConnectorURI == null) {
647            // master start has not finished yet
648            if (slaveStartSignal.getCount() == 1) {
649                started.set(false);
650                slaveStartSignal.countDown();
651            }
652        } else {
653            for (Service service : services) {
654                if (service instanceof MasterConnector) {
655                    MasterConnector mConnector = (MasterConnector) service;
656                    if (!mConnector.isSlave()) {
657                        // means should be slave but not connected to master yet
658                        started.set(false);
659                        mConnector.stopBeforeConnected();
660                    }
661                }
662            }
663        }
664        if (this.taskRunnerFactory != null) {
665            this.taskRunnerFactory.shutdown();
666            this.taskRunnerFactory = null;
667        }
668        if (this.executor != null) {
669            this.executor.shutdownNow();
670            this.executor = null;
671        }
672
673        this.destinationInterceptors = null;
674        this.destinationFactory = null;
675
676        LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
677        synchronized (shutdownHooks) {
678            for (Runnable hook : shutdownHooks) {
679                try {
680                    hook.run();
681                } catch (Throwable e) {
682                    stopper.onException(hook, e);
683                }
684            }
685        }
686
687        MDC.remove("activemq.broker");
688
689        stopper.throwFirstException();
690    }
691
692    public boolean checkQueueSize(String queueName) {
693        long count = 0;
694        long queueSize = 0;
695        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
696        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
697            if (entry.getKey().isQueue()) {
698                if (entry.getValue().getName().matches(queueName)) {
699                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
700                    count += queueSize;
701                    if (queueSize > 0) {
702                        LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
703                                + queueSize);
704                    }
705                }
706            }
707        }
708        return count == 0;
709    }
710
711    /**
712     * This method (both connectorName and queueName are using regex to match)
713     * 1. stop the connector (supposed the user input the connector which the
714     * clients connect to) 2. to check whether there is any pending message on
715     * the queues defined by queueName 3. supposedly, after stop the connector,
716     * client should failover to other broker and pending messages should be
717     * forwarded. if no pending messages, the method finally call stop to stop
718     * the broker.
719     *
720     * @param connectorName
721     * @param queueName
722     * @param timeout
723     * @param pollInterval
724     * @throws Exception
725     */
726    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
727            throws Exception {
728        if (isUseJmx()) {
729            if (connectorName == null || queueName == null || timeout <= 0) {
730                throw new Exception(
731                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
732            }
733            if (pollInterval <= 0) {
734                pollInterval = 30;
735            }
736            LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
737                    + timeout + " pollInterval:" + pollInterval);
738            TransportConnector connector;
739            for (int i = 0; i < transportConnectors.size(); i++) {
740                connector = transportConnectors.get(i);
741                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
742                    connector.stop();
743                }
744            }
745            long start = System.currentTimeMillis();
746            while (System.currentTimeMillis() - start < timeout * 1000) {
747                // check quesize until it gets zero
748                if (checkQueueSize(queueName)) {
749                    stop();
750                    break;
751                } else {
752                    Thread.sleep(pollInterval * 1000);
753                }
754            }
755            if (stopped.get()) {
756                LOG.info("Successfully stop the broker.");
757            } else {
758                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
759            }
760        }
761    }
762
763    /**
764     * A helper method to block the caller thread until the broker has been
765     * stopped
766     */
767    public void waitUntilStopped() {
768        while (isStarted() && !stopped.get()) {
769            try {
770                stoppedLatch.await();
771            } catch (InterruptedException e) {
772                // ignore
773            }
774        }
775    }
776
777    /**
778     * A helper method to block the caller thread until the broker has fully started
779     * @return boolean true if wait succeeded false if broker was not started or was stopped
780     */
781    public boolean waitUntilStarted() {
782        boolean waitSucceeded = false;
783        while (isStarted() && !stopped.get() && !waitSucceeded) {
784            try {
785                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
786            } catch (InterruptedException ignore) {
787            }
788        }
789        return waitSucceeded;
790    }
791
792    // Properties
793    // -------------------------------------------------------------------------
794    /**
795     * Returns the message broker
796     */
797    public Broker getBroker() throws Exception {
798        if (broker == null) {
799            LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
800                    + getBrokerName() + ") is starting");
801            LOG.info("For help or more information please see: http://activemq.apache.org/");
802            broker = createBroker();
803        }
804        return broker;
805    }
806
807    /**
808     * Returns the administration view of the broker; used to create and destroy
809     * resources such as queues and topics. Note this method returns null if JMX
810     * is disabled.
811     */
812    public BrokerView getAdminView() throws Exception {
813        if (adminView == null) {
814            // force lazy creation
815            getBroker();
816        }
817        return adminView;
818    }
819
820    public void setAdminView(BrokerView adminView) {
821        this.adminView = adminView;
822    }
823
824    public String getBrokerName() {
825        return brokerName;
826    }
827
828    /**
829     * Sets the name of this broker; which must be unique in the network
830     *
831     * @param brokerName
832     */
833    public void setBrokerName(String brokerName) {
834        if (brokerName == null) {
835            throw new NullPointerException("The broker name cannot be null");
836        }
837        String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
838        if (!str.equals(brokerName)) {
839            LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
840        }
841        this.brokerName = str.trim();
842    }
843
844    public PersistenceAdapterFactory getPersistenceFactory() {
845        return persistenceFactory;
846    }
847
848    public File getDataDirectoryFile() {
849        if (dataDirectoryFile == null) {
850            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
851        }
852        return dataDirectoryFile;
853    }
854
855    public File getBrokerDataDirectory() {
856        String brokerDir = getBrokerName();
857        return new File(getDataDirectoryFile(), brokerDir);
858    }
859
860    /**
861     * Sets the directory in which the data files will be stored by default for
862     * the JDBC and Journal persistence adaptors.
863     *
864     * @param dataDirectory
865     *            the directory to store data files
866     */
867    public void setDataDirectory(String dataDirectory) {
868        setDataDirectoryFile(new File(dataDirectory));
869    }
870
871    /**
872     * Sets the directory in which the data files will be stored by default for
873     * the JDBC and Journal persistence adaptors.
874     *
875     * @param dataDirectoryFile
876     *            the directory to store data files
877     */
878    public void setDataDirectoryFile(File dataDirectoryFile) {
879        this.dataDirectoryFile = dataDirectoryFile;
880    }
881
882    /**
883     * @return the tmpDataDirectory
884     */
885    public File getTmpDataDirectory() {
886        if (tmpDataDirectory == null) {
887            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
888        }
889        return tmpDataDirectory;
890    }
891
892    /**
893     * @param tmpDataDirectory
894     *            the tmpDataDirectory to set
895     */
896    public void setTmpDataDirectory(File tmpDataDirectory) {
897        this.tmpDataDirectory = tmpDataDirectory;
898    }
899
900    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
901        this.persistenceFactory = persistenceFactory;
902    }
903
904    public void setDestinationFactory(DestinationFactory destinationFactory) {
905        this.destinationFactory = destinationFactory;
906    }
907
908    public boolean isPersistent() {
909        return persistent;
910    }
911
912    /**
913     * Sets whether or not persistence is enabled or disabled.
914     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
915     */
916    public void setPersistent(boolean persistent) {
917        this.persistent = persistent;
918    }
919
920    public boolean isPopulateJMSXUserID() {
921        return populateJMSXUserID;
922    }
923
924    /**
925     * Sets whether or not the broker should populate the JMSXUserID header.
926     */
927    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
928        this.populateJMSXUserID = populateJMSXUserID;
929    }
930
931    public SystemUsage getSystemUsage() {
932        try {
933            if (systemUsage == null) {
934                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
935                systemUsage.setExecutor(getExecutor());
936                systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
937                                                                         // 64
938                                                                         // Meg
939                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1000 * 50); // 50
940                                                                                // Gb
941                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1000 * 100); // 100
942                                                                                 // GB
943                addService(this.systemUsage);
944            }
945            return systemUsage;
946        } catch (IOException e) {
947            LOG.error("Cannot create SystemUsage", e);
948            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
949        }
950    }
951
952    public void setSystemUsage(SystemUsage memoryManager) {
953        if (this.systemUsage != null) {
954            removeService(this.systemUsage);
955        }
956        this.systemUsage = memoryManager;
957        if (this.systemUsage.getExecutor()==null) {
958            this.systemUsage.setExecutor(getExecutor());
959        }
960        addService(this.systemUsage);
961    }
962
963    /**
964     * @return the consumerUsageManager
965     * @throws IOException
966     */
967    public SystemUsage getConsumerSystemUsage() throws IOException {
968        if (this.consumerSystemUsaage == null) {
969            if (splitSystemUsageForProducersConsumers) {
970                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
971                float portion = consumerSystemUsagePortion / 100f;
972                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
973                addService(this.consumerSystemUsaage);
974            } else {
975                consumerSystemUsaage = getSystemUsage();
976            }
977        }
978        return this.consumerSystemUsaage;
979    }
980
981    /**
982     * @param consumerSystemUsaage
983     *            the storeSystemUsage to set
984     */
985    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
986        if (this.consumerSystemUsaage != null) {
987            removeService(this.consumerSystemUsaage);
988        }
989        this.consumerSystemUsaage = consumerSystemUsaage;
990        addService(this.consumerSystemUsaage);
991    }
992
993    /**
994     * @return the producerUsageManager
995     * @throws IOException
996     */
997    public SystemUsage getProducerSystemUsage() throws IOException {
998        if (producerSystemUsage == null) {
999            if (splitSystemUsageForProducersConsumers) {
1000                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1001                float portion = producerSystemUsagePortion / 100f;
1002                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1003                addService(producerSystemUsage);
1004            } else {
1005                producerSystemUsage = getSystemUsage();
1006            }
1007        }
1008        return producerSystemUsage;
1009    }
1010
1011    /**
1012     * @param producerUsageManager
1013     *            the producerUsageManager to set
1014     */
1015    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1016        if (this.producerSystemUsage != null) {
1017            removeService(this.producerSystemUsage);
1018        }
1019        this.producerSystemUsage = producerUsageManager;
1020        addService(this.producerSystemUsage);
1021    }
1022
1023    public PersistenceAdapter getPersistenceAdapter() throws IOException {
1024        if (persistenceAdapter == null) {
1025            persistenceAdapter = createPersistenceAdapter();
1026            configureService(persistenceAdapter);
1027            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1028        }
1029        return persistenceAdapter;
1030    }
1031
1032    /**
1033     * Sets the persistence adaptor implementation to use for this broker
1034     *
1035     * @throws IOException
1036     */
1037    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1038        this.persistenceAdapter = persistenceAdapter;
1039        configureService(this.persistenceAdapter);
1040        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1041    }
1042
1043    public TaskRunnerFactory getTaskRunnerFactory() {
1044        if (this.taskRunnerFactory == null) {
1045            this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1046                    isDedicatedTaskRunner());
1047        }
1048        return this.taskRunnerFactory;
1049    }
1050
1051    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1052        this.taskRunnerFactory = taskRunnerFactory;
1053    }
1054
1055    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1056        if (taskRunnerFactory == null) {
1057            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1058                    true, 1000, isDedicatedTaskRunner());
1059        }
1060        return persistenceTaskRunnerFactory;
1061    }
1062
1063    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1064        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1065    }
1066
1067    public boolean isUseJmx() {
1068        return useJmx;
1069    }
1070
1071    public boolean isEnableStatistics() {
1072        return enableStatistics;
1073    }
1074
1075    /**
1076     * Sets whether or not the Broker's services enable statistics or not.
1077     */
1078    public void setEnableStatistics(boolean enableStatistics) {
1079        this.enableStatistics = enableStatistics;
1080    }
1081
1082    /**
1083     * Sets whether or not the Broker's services should be exposed into JMX or
1084     * not.
1085     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1086     */
1087    public void setUseJmx(boolean useJmx) {
1088        this.useJmx = useJmx;
1089    }
1090
1091    public ObjectName getBrokerObjectName() throws IOException {
1092        if (brokerObjectName == null) {
1093            brokerObjectName = createBrokerObjectName();
1094        }
1095        return brokerObjectName;
1096    }
1097
1098    /**
1099     * Sets the JMX ObjectName for this broker
1100     */
1101    public void setBrokerObjectName(ObjectName brokerObjectName) {
1102        this.brokerObjectName = brokerObjectName;
1103    }
1104
1105    public ManagementContext getManagementContext() {
1106        if (managementContext == null) {
1107            managementContext = new ManagementContext();
1108        }
1109        return managementContext;
1110    }
1111
1112    public void setManagementContext(ManagementContext managementContext) {
1113        this.managementContext = managementContext;
1114    }
1115
1116    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1117        for (NetworkConnector connector : networkConnectors) {
1118            if (connector.getName().equals(connectorName)) {
1119                return connector;
1120            }
1121        }
1122        return null;
1123    }
1124
1125    public String[] getNetworkConnectorURIs() {
1126        return networkConnectorURIs;
1127    }
1128
1129    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1130        this.networkConnectorURIs = networkConnectorURIs;
1131    }
1132
1133    public TransportConnector getConnectorByName(String connectorName) {
1134        for (TransportConnector connector : transportConnectors) {
1135            if (connector.getName().equals(connectorName)) {
1136                return connector;
1137            }
1138        }
1139        return null;
1140    }
1141
1142    public Map<String, String> getTransportConnectorURIsAsMap() {
1143        Map<String, String> answer = new HashMap<String, String>();
1144        for (TransportConnector connector : transportConnectors) {
1145            try {
1146                URI uri = connector.getConnectUri();
1147                if (uri != null) {
1148                    String scheme = uri.getScheme();
1149                    if (scheme != null) {
1150                        answer.put(scheme.toLowerCase(), uri.toString());
1151                    }
1152                }
1153            } catch (Exception e) {
1154                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1155            }
1156        }
1157        return answer;
1158    }
1159
1160    public String[] getTransportConnectorURIs() {
1161        return transportConnectorURIs;
1162    }
1163
1164    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1165        this.transportConnectorURIs = transportConnectorURIs;
1166    }
1167
1168    /**
1169     * @return Returns the jmsBridgeConnectors.
1170     */
1171    public JmsConnector[] getJmsBridgeConnectors() {
1172        return jmsBridgeConnectors;
1173    }
1174
1175    /**
1176     * @param jmsConnectors
1177     *            The jmsBridgeConnectors to set.
1178     */
1179    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1180        this.jmsBridgeConnectors = jmsConnectors;
1181    }
1182
1183    public Service[] getServices() {
1184        return services.toArray(new Service[0]);
1185    }
1186
1187    /**
1188     * Sets the services associated with this broker such as a
1189     * {@link MasterConnector}
1190     */
1191    public void setServices(Service[] services) {
1192        this.services.clear();
1193        if (services != null) {
1194            for (int i = 0; i < services.length; i++) {
1195                this.services.add(services[i]);
1196            }
1197        }
1198    }
1199
1200    /**
1201     * Adds a new service so that it will be started as part of the broker
1202     * lifecycle
1203     */
1204    public void addService(Service service) {
1205        services.add(service);
1206    }
1207
1208    public void removeService(Service service) {
1209        services.remove(service);
1210    }
1211
1212    public boolean isUseLoggingForShutdownErrors() {
1213        return useLoggingForShutdownErrors;
1214    }
1215
1216    /**
1217     * Sets whether or not we should use commons-logging when reporting errors
1218     * when shutting down the broker
1219     */
1220    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1221        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1222    }
1223
1224    public boolean isUseShutdownHook() {
1225        return useShutdownHook;
1226    }
1227
1228    /**
1229     * Sets whether or not we should use a shutdown handler to close down the
1230     * broker cleanly if the JVM is terminated. It is recommended you leave this
1231     * enabled.
1232     */
1233    public void setUseShutdownHook(boolean useShutdownHook) {
1234        this.useShutdownHook = useShutdownHook;
1235    }
1236
1237    public boolean isAdvisorySupport() {
1238        return advisorySupport;
1239    }
1240
1241    /**
1242     * Allows the support of advisory messages to be disabled for performance
1243     * reasons.
1244     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1245     */
1246    public void setAdvisorySupport(boolean advisorySupport) {
1247        this.advisorySupport = advisorySupport;
1248    }
1249
1250    public List<TransportConnector> getTransportConnectors() {
1251        return new ArrayList<TransportConnector>(transportConnectors);
1252    }
1253
1254    /**
1255     * Sets the transport connectors which this broker will listen on for new
1256     * clients
1257     *
1258     * @org.apache.xbean.Property
1259     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1260     */
1261    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1262        for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1263            TransportConnector connector = iter.next();
1264            addConnector(connector);
1265        }
1266    }
1267
1268    public TransportConnector getTransportConnectorByName(String name){
1269        for (TransportConnector transportConnector:transportConnectors){
1270           if (name.equals(transportConnector.getName())){
1271               return transportConnector;
1272           }
1273        }
1274        return null;
1275    }
1276
1277    public TransportConnector getTransportConnectorByScheme(String scheme){
1278        for (TransportConnector transportConnector:transportConnectors){
1279            if (scheme.equals(transportConnector.getUri().getScheme())){
1280                return transportConnector;
1281            }
1282        }
1283        return null;
1284    }
1285
1286    public List<NetworkConnector> getNetworkConnectors() {
1287        return new ArrayList<NetworkConnector>(networkConnectors);
1288    }
1289
1290    public List<ProxyConnector> getProxyConnectors() {
1291        return new ArrayList<ProxyConnector>(proxyConnectors);
1292    }
1293
1294    /**
1295     * Sets the network connectors which this broker will use to connect to
1296     * other brokers in a federated network
1297     *
1298     * @org.apache.xbean.Property
1299     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1300     */
1301    public void setNetworkConnectors(List networkConnectors) throws Exception {
1302        for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1303            NetworkConnector connector = (NetworkConnector) iter.next();
1304            addNetworkConnector(connector);
1305        }
1306    }
1307
1308    /**
1309     * Sets the network connectors which this broker will use to connect to
1310     * other brokers in a federated network
1311     */
1312    public void setProxyConnectors(List proxyConnectors) throws Exception {
1313        for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1314            ProxyConnector connector = (ProxyConnector) iter.next();
1315            addProxyConnector(connector);
1316        }
1317    }
1318
1319    public PolicyMap getDestinationPolicy() {
1320        return destinationPolicy;
1321    }
1322
1323    /**
1324     * Sets the destination specific policies available either for exact
1325     * destinations or for wildcard areas of destinations.
1326     */
1327    public void setDestinationPolicy(PolicyMap policyMap) {
1328        this.destinationPolicy = policyMap;
1329    }
1330
1331    public BrokerPlugin[] getPlugins() {
1332        return plugins;
1333    }
1334
1335    /**
1336     * Sets a number of broker plugins to install such as for security
1337     * authentication or authorization
1338     */
1339    public void setPlugins(BrokerPlugin[] plugins) {
1340        this.plugins = plugins;
1341    }
1342
1343    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1344        return messageAuthorizationPolicy;
1345    }
1346
1347    /**
1348     * Sets the policy used to decide if the current connection is authorized to
1349     * consume a given message
1350     */
1351    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1352        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1353    }
1354
1355    /**
1356     * Delete all messages from the persistent store
1357     *
1358     * @throws IOException
1359     */
1360    public void deleteAllMessages() throws IOException {
1361        getPersistenceAdapter().deleteAllMessages();
1362    }
1363
1364    public boolean isDeleteAllMessagesOnStartup() {
1365        return deleteAllMessagesOnStartup;
1366    }
1367
1368    /**
1369     * Sets whether or not all messages are deleted on startup - mostly only
1370     * useful for testing.
1371     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1372     */
1373    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1374        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1375    }
1376
1377    public URI getVmConnectorURI() {
1378        if (vmConnectorURI == null) {
1379            try {
1380                vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1381            } catch (URISyntaxException e) {
1382                LOG.error("Badly formed URI from " + getBrokerName(), e);
1383            }
1384        }
1385        return vmConnectorURI;
1386    }
1387
1388    public void setVmConnectorURI(URI vmConnectorURI) {
1389        this.vmConnectorURI = vmConnectorURI;
1390    }
1391
1392    public String getDefaultSocketURIString() {
1393
1394            if (started.get()) {
1395                if (this.defaultSocketURIString == null) {
1396                    for (TransportConnector tc:this.transportConnectors) {
1397                        String result = null;
1398                        try {
1399                            result = tc.getPublishableConnectString();
1400                        } catch (Exception e) {
1401                          LOG.warn("Failed to get the ConnectURI for "+tc,e);
1402                        }
1403                        if (result != null) {
1404                            // find first publishable uri
1405                            if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1406                                this.defaultSocketURIString = result;
1407                                break;
1408                            } else {
1409                            // or use the first defined
1410                                if (this.defaultSocketURIString == null) {
1411                                    this.defaultSocketURIString = result;
1412                                }
1413                            }
1414                        }
1415                    }
1416
1417                }
1418                return this.defaultSocketURIString;
1419            }
1420       return null;
1421    }
1422
1423    /**
1424     * @return Returns the shutdownOnMasterFailure.
1425     */
1426    public boolean isShutdownOnMasterFailure() {
1427        return shutdownOnMasterFailure;
1428    }
1429
1430    /**
1431     * @param shutdownOnMasterFailure
1432     *            The shutdownOnMasterFailure to set.
1433     */
1434    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1435        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1436    }
1437
1438    public boolean isKeepDurableSubsActive() {
1439        return keepDurableSubsActive;
1440    }
1441
1442    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1443        this.keepDurableSubsActive = keepDurableSubsActive;
1444    }
1445
1446    public boolean isUseVirtualTopics() {
1447        return useVirtualTopics;
1448    }
1449
1450    /**
1451     * Sets whether or not <a
1452     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1453     * Topics</a> should be supported by default if they have not been
1454     * explicitly configured.
1455     */
1456    public void setUseVirtualTopics(boolean useVirtualTopics) {
1457        this.useVirtualTopics = useVirtualTopics;
1458    }
1459
1460    public DestinationInterceptor[] getDestinationInterceptors() {
1461        return destinationInterceptors;
1462    }
1463
1464    public boolean isUseMirroredQueues() {
1465        return useMirroredQueues;
1466    }
1467
1468    /**
1469     * Sets whether or not <a
1470     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1471     * Queues</a> should be supported by default if they have not been
1472     * explicitly configured.
1473     */
1474    public void setUseMirroredQueues(boolean useMirroredQueues) {
1475        this.useMirroredQueues = useMirroredQueues;
1476    }
1477
1478    /**
1479     * Sets the destination interceptors to use
1480     */
1481    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1482        this.destinationInterceptors = destinationInterceptors;
1483    }
1484
1485    public ActiveMQDestination[] getDestinations() {
1486        return destinations;
1487    }
1488
1489    /**
1490     * Sets the destinations which should be loaded/created on startup
1491     */
1492    public void setDestinations(ActiveMQDestination[] destinations) {
1493        this.destinations = destinations;
1494    }
1495
1496    /**
1497     * @return the tempDataStore
1498     */
1499    public synchronized PListStore getTempDataStore() {
1500        if (tempDataStore == null) {
1501            if (!isPersistent()) {
1502                return null;
1503            }
1504            boolean result = true;
1505            boolean empty = true;
1506            try {
1507                File directory = getTmpDataDirectory();
1508                if (directory.exists() && directory.isDirectory()) {
1509                    File[] files = directory.listFiles();
1510                    if (files != null && files.length > 0) {
1511                        empty = false;
1512                        for (int i = 0; i < files.length; i++) {
1513                            File file = files[i];
1514                            if (!file.isDirectory()) {
1515                                result &= file.delete();
1516                            }
1517                        }
1518                    }
1519                }
1520                if (!empty) {
1521                    String str = result ? "Successfully deleted" : "Failed to delete";
1522                    LOG.info(str + " temporary storage");
1523                }
1524                this.tempDataStore = new PListStore();
1525                this.tempDataStore.setDirectory(getTmpDataDirectory());
1526                configureService(tempDataStore);
1527                this.tempDataStore.start();
1528            } catch (Exception e) {
1529                throw new RuntimeException(e);
1530            }
1531        }
1532        return tempDataStore;
1533    }
1534
1535    /**
1536     * @param tempDataStore
1537     *            the tempDataStore to set
1538     */
1539    public void setTempDataStore(PListStore tempDataStore) {
1540        this.tempDataStore = tempDataStore;
1541        configureService(tempDataStore);
1542        try {
1543            tempDataStore.start();
1544        } catch (Exception e) {
1545            RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1546            LOG.error(exception.getLocalizedMessage(), e);
1547            throw exception;
1548        }
1549    }
1550
1551    public int getPersistenceThreadPriority() {
1552        return persistenceThreadPriority;
1553    }
1554
1555    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1556        this.persistenceThreadPriority = persistenceThreadPriority;
1557    }
1558
1559    /**
1560     * @return the useLocalHostBrokerName
1561     */
1562    public boolean isUseLocalHostBrokerName() {
1563        return this.useLocalHostBrokerName;
1564    }
1565
1566    /**
1567     * @param useLocalHostBrokerName
1568     *            the useLocalHostBrokerName to set
1569     */
1570    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1571        this.useLocalHostBrokerName = useLocalHostBrokerName;
1572        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1573            brokerName = LOCAL_HOST_NAME;
1574        }
1575    }
1576
1577    /**
1578     * @return the supportFailOver
1579     */
1580    public boolean isSupportFailOver() {
1581        return this.supportFailOver;
1582    }
1583
1584    /**
1585     * @param supportFailOver
1586     *            the supportFailOver to set
1587     */
1588    public void setSupportFailOver(boolean supportFailOver) {
1589        this.supportFailOver = supportFailOver;
1590    }
1591
1592    /**
1593     * Looks up and lazily creates if necessary the destination for the given
1594     * JMS name
1595     */
1596    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1597        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1598    }
1599
1600    public void removeDestination(ActiveMQDestination destination) throws Exception {
1601        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1602    }
1603
1604    public int getProducerSystemUsagePortion() {
1605        return producerSystemUsagePortion;
1606    }
1607
1608    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1609        this.producerSystemUsagePortion = producerSystemUsagePortion;
1610    }
1611
1612    public int getConsumerSystemUsagePortion() {
1613        return consumerSystemUsagePortion;
1614    }
1615
1616    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1617        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1618    }
1619
1620    public boolean isSplitSystemUsageForProducersConsumers() {
1621        return splitSystemUsageForProducersConsumers;
1622    }
1623
1624    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1625        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1626    }
1627
1628    public boolean isMonitorConnectionSplits() {
1629        return monitorConnectionSplits;
1630    }
1631
1632    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1633        this.monitorConnectionSplits = monitorConnectionSplits;
1634    }
1635
1636    public int getTaskRunnerPriority() {
1637        return taskRunnerPriority;
1638    }
1639
1640    public void setTaskRunnerPriority(int taskRunnerPriority) {
1641        this.taskRunnerPriority = taskRunnerPriority;
1642    }
1643
1644    public boolean isDedicatedTaskRunner() {
1645        return dedicatedTaskRunner;
1646    }
1647
1648    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1649        this.dedicatedTaskRunner = dedicatedTaskRunner;
1650    }
1651
1652    public boolean isCacheTempDestinations() {
1653        return cacheTempDestinations;
1654    }
1655
1656    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1657        this.cacheTempDestinations = cacheTempDestinations;
1658    }
1659
1660    public int getTimeBeforePurgeTempDestinations() {
1661        return timeBeforePurgeTempDestinations;
1662    }
1663
1664    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1665        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1666    }
1667
1668    public boolean isUseTempMirroredQueues() {
1669        return useTempMirroredQueues;
1670    }
1671
1672    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1673        this.useTempMirroredQueues = useTempMirroredQueues;
1674    }
1675
1676    //
1677    // Implementation methods
1678    // -------------------------------------------------------------------------
1679    /**
1680     * Handles any lazy-creation helper properties which are added to make
1681     * things easier to configure inside environments such as Spring
1682     *
1683     * @throws Exception
1684     */
1685    protected void processHelperProperties() throws Exception {
1686        boolean masterServiceExists = false;
1687        if (transportConnectorURIs != null) {
1688            for (int i = 0; i < transportConnectorURIs.length; i++) {
1689                String uri = transportConnectorURIs[i];
1690                addConnector(uri);
1691            }
1692        }
1693        if (networkConnectorURIs != null) {
1694            for (int i = 0; i < networkConnectorURIs.length; i++) {
1695                String uri = networkConnectorURIs[i];
1696                addNetworkConnector(uri);
1697            }
1698        }
1699        if (jmsBridgeConnectors != null) {
1700            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1701                addJmsConnector(jmsBridgeConnectors[i]);
1702            }
1703        }
1704        for (Service service : services) {
1705            if (service instanceof MasterConnector) {
1706                masterServiceExists = true;
1707                break;
1708            }
1709        }
1710        if (masterConnectorURI != null) {
1711            if (masterServiceExists) {
1712                throw new IllegalStateException(
1713                        "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1714            } else {
1715                addService(new MasterConnector(masterConnectorURI));
1716            }
1717        }
1718    }
1719
1720    protected void checkSystemUsageLimits() throws IOException {
1721        SystemUsage usage = getSystemUsage();
1722        long memLimit = usage.getMemoryUsage().getLimit();
1723        long jvmLimit = Runtime.getRuntime().maxMemory();
1724
1725        if (memLimit > jvmLimit) {
1726            LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1727                      " mb) is more than the maximum available for the JVM: " +
1728                      jvmLimit / (1024 * 1024) + " mb");
1729        }
1730
1731        if (getPersistenceAdapter() != null) {
1732            PersistenceAdapter adapter = getPersistenceAdapter();
1733            File dir = adapter.getDirectory();
1734
1735            if (dir != null) {
1736                String dirPath = dir.getAbsolutePath();
1737                if (!dir.isAbsolute()) {
1738                    dir = new File(dirPath);
1739                }
1740
1741                while (dir != null && dir.isDirectory() == false) {
1742                    dir = dir.getParentFile();
1743                }
1744                long storeLimit = usage.getStoreUsage().getLimit();
1745                long dirFreeSpace = dir.getUsableSpace();
1746                if (storeLimit > dirFreeSpace) {
1747                    LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1748                             " mb, whilst the data directory: " + dir.getAbsolutePath() +
1749                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1750                }
1751            }
1752
1753            long maxJournalFileSize = 0;
1754            long storeLimit = usage.getStoreUsage().getLimit();
1755
1756            if (adapter instanceof KahaDBPersistenceAdapter) {
1757                KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
1758                maxJournalFileSize = kahaDB.getJournalMaxFileLength();
1759            } else if (adapter instanceof AMQPersistenceAdapter) {
1760                AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
1761                maxJournalFileSize = amqAdapter.getMaxFileLength();
1762            }
1763
1764            if (storeLimit < maxJournalFileSize) {
1765                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1766                          " mb, whilst the max journal file size for the store is: " +
1767                          maxJournalFileSize / (1024 * 1024) + " mb, " +
1768                          "the store will not accept any data when used.");
1769            }
1770        }
1771
1772        File tmpDir = getTmpDataDirectory();
1773        if (tmpDir != null) {
1774
1775            String tmpDirPath = tmpDir.getAbsolutePath();
1776            if (!tmpDir.isAbsolute()) {
1777                tmpDir = new File(tmpDirPath);
1778            }
1779
1780            long storeLimit = usage.getTempUsage().getLimit();
1781            while (tmpDir != null && tmpDir.isDirectory() == false) {
1782                tmpDir = tmpDir.getParentFile();
1783            }
1784            long dirFreeSpace = tmpDir.getUsableSpace();
1785            if (storeLimit > dirFreeSpace) {
1786                LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1787                          " mb, whilst the temporary data directory: " + tmpDirPath +
1788                          " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1789            }
1790
1791            long maxJournalFileSize;
1792
1793            if (usage.getTempUsage().getStore() != null) {
1794                maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
1795            } else {
1796                maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
1797            }
1798
1799            if (storeLimit < maxJournalFileSize) {
1800                LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1801                          " mb, whilst the max journal file size for the temporary store is: " +
1802                          maxJournalFileSize / (1024 * 1024) + " mb, " +
1803                          "the temp store will not accept any data when used.");
1804            }
1805        }
1806    }
1807
1808    public void stopAllConnectors(ServiceStopper stopper) {
1809        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1810            NetworkConnector connector = iter.next();
1811            unregisterNetworkConnectorMBean(connector);
1812            stopper.stop(connector);
1813        }
1814        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1815            ProxyConnector connector = iter.next();
1816            stopper.stop(connector);
1817        }
1818        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1819            JmsConnector connector = iter.next();
1820            stopper.stop(connector);
1821        }
1822        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1823            TransportConnector connector = iter.next();
1824            stopper.stop(connector);
1825        }
1826    }
1827
1828    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1829        try {
1830            ObjectName objectName = createConnectorObjectName(connector);
1831            connector = connector.asManagedConnector(getManagementContext(), objectName);
1832            ConnectorViewMBean view = new ConnectorView(connector);
1833            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1834            return connector;
1835        } catch (Throwable e) {
1836            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1837        }
1838    }
1839
1840    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1841        if (isUseJmx()) {
1842            try {
1843                ObjectName objectName = createConnectorObjectName(connector);
1844                getManagementContext().unregisterMBean(objectName);
1845            } catch (Throwable e) {
1846                throw IOExceptionSupport.create(
1847                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1848            }
1849        }
1850    }
1851
1852    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1853        return adaptor;
1854    }
1855
1856    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1857        if (isUseJmx()) {
1858        }
1859    }
1860
1861    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1862        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1863                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1864                + JMXSupport.encodeObjectNamePart(connector.getName()));
1865    }
1866
1867    protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1868        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1869        try {
1870            ObjectName objectName = createNetworkConnectorObjectName(connector);
1871            connector.setObjectName(objectName);
1872            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1873        } catch (Throwable e) {
1874            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1875        }
1876    }
1877
1878    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1879            throws MalformedObjectNameException {
1880        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1881                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1882                + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1883    }
1884
1885
1886    public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1887            throws MalformedObjectNameException {
1888        return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1889                + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1890                + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1891    }
1892
1893    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1894        if (isUseJmx()) {
1895            try {
1896                ObjectName objectName = createNetworkConnectorObjectName(connector);
1897                getManagementContext().unregisterMBean(objectName);
1898            } catch (Exception e) {
1899                LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1900            }
1901        }
1902    }
1903
1904    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1905        ProxyConnectorView view = new ProxyConnectorView(connector);
1906        try {
1907            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1908                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1909                    + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1910            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1911        } catch (Throwable e) {
1912            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1913        }
1914    }
1915
1916    protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1917        FTConnectorView view = new FTConnectorView(connector);
1918        try {
1919            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1920                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1921            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1922        } catch (Throwable e) {
1923            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1924        }
1925    }
1926
1927    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1928        JmsConnectorView view = new JmsConnectorView(connector);
1929        try {
1930            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1931                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1932                    + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1933            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1934        } catch (Throwable e) {
1935            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1936        }
1937    }
1938
1939    /**
1940     * Factory method to create a new broker
1941     *
1942     * @throws Exception
1943     * @throws
1944     * @throws
1945     */
1946    protected Broker createBroker() throws Exception {
1947        regionBroker = createRegionBroker();
1948        Broker broker = addInterceptors(regionBroker);
1949        // Add a filter that will stop access to the broker once stopped
1950        broker = new MutableBrokerFilter(broker) {
1951            Broker old;
1952
1953            @Override
1954            public void stop() throws Exception {
1955                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1956                    // Just ignore additional stop actions.
1957                    @Override
1958                    public void stop() throws Exception {
1959                    }
1960                });
1961                old.stop();
1962            }
1963
1964            @Override
1965            public void start() throws Exception {
1966                if (forceStart && old != null) {
1967                    this.next.set(old);
1968                }
1969                getNext().start();
1970            }
1971        };
1972        return broker;
1973    }
1974
1975    /**
1976     * Factory method to create the core region broker onto which interceptors
1977     * are added
1978     *
1979     * @throws Exception
1980     */
1981    protected Broker createRegionBroker() throws Exception {
1982        if (destinationInterceptors == null) {
1983            destinationInterceptors = createDefaultDestinationInterceptor();
1984        }
1985        configureServices(destinationInterceptors);
1986        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1987        if (destinationFactory == null) {
1988            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1989        }
1990        return createRegionBroker(destinationInterceptor);
1991    }
1992
1993    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1994        RegionBroker regionBroker;
1995        if (isUseJmx()) {
1996            regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1997                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1998        } else {
1999            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2000                    destinationInterceptor,getScheduler(),getExecutor());
2001        }
2002        destinationFactory.setRegionBroker(regionBroker);
2003        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2004        regionBroker.setBrokerName(getBrokerName());
2005        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2006        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2007        if (brokerId != null) {
2008            regionBroker.setBrokerId(brokerId);
2009        }
2010        return regionBroker;
2011    }
2012
2013    /**
2014     * Create the default destination interceptor
2015     */
2016    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2017        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2018        if (isUseVirtualTopics()) {
2019            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2020            VirtualTopic virtualTopic = new VirtualTopic();
2021            virtualTopic.setName("VirtualTopic.>");
2022            VirtualDestination[] virtualDestinations = { virtualTopic };
2023            interceptor.setVirtualDestinations(virtualDestinations);
2024            answer.add(interceptor);
2025        }
2026        if (isUseMirroredQueues()) {
2027            MirroredQueue interceptor = new MirroredQueue();
2028            answer.add(interceptor);
2029        }
2030        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2031        answer.toArray(array);
2032        return array;
2033    }
2034
2035    /**
2036     * Strategy method to add interceptors to the broker
2037     *
2038     * @throws IOException
2039     */
2040    protected Broker addInterceptors(Broker broker) throws Exception {
2041        if (isSchedulerSupport()) {
2042            SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
2043            if (isUseJmx()) {
2044                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2045                try {
2046                    ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
2047                            + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
2048                            + "Type=jobScheduler," + "jobSchedulerName=JMS");
2049
2050                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2051                    this.adminView.setJMSJobScheduler(objectName);
2052                } catch (Throwable e) {
2053                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2054                            + e.getMessage(), e);
2055                }
2056
2057            }
2058            broker = sb;
2059        }
2060        if (isAdvisorySupport()) {
2061            broker = new AdvisoryBroker(broker);
2062        }
2063        broker = new CompositeDestinationBroker(broker);
2064        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2065        if (isPopulateJMSXUserID()) {
2066            UserIDBroker userIDBroker = new UserIDBroker(broker);
2067            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2068            broker = userIDBroker;
2069        }
2070        if (isMonitorConnectionSplits()) {
2071            broker = new ConnectionSplitBroker(broker);
2072        }
2073        if (plugins != null) {
2074            for (int i = 0; i < plugins.length; i++) {
2075                BrokerPlugin plugin = plugins[i];
2076                broker = plugin.installPlugin(broker);
2077            }
2078        }
2079        return broker;
2080    }
2081
2082    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2083        if (isPersistent()) {
2084            PersistenceAdapterFactory fac = getPersistenceFactory();
2085            if (fac != null) {
2086                return fac.createPersistenceAdapter();
2087            }else {
2088                KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
2089                File dir = new File(getBrokerDataDirectory(),"KahaDB");
2090                adaptor.setDirectory(dir);
2091                return adaptor;
2092            }
2093        } else {
2094            return new MemoryPersistenceAdapter();
2095        }
2096    }
2097
2098    protected ObjectName createBrokerObjectName() throws IOException {
2099        try {
2100            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2101                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
2102        } catch (Throwable e) {
2103            throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
2104        }
2105    }
2106
2107    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2108        TransportServer transport = TransportFactory.bind(this, brokerURI);
2109        return new TransportConnector(transport);
2110    }
2111
2112    /**
2113     * Extracts the port from the options
2114     */
2115    protected Object getPort(Map options) {
2116        Object port = options.get("port");
2117        if (port == null) {
2118            port = DEFAULT_PORT;
2119            LOG.warn("No port specified so defaulting to: " + port);
2120        }
2121        return port;
2122    }
2123
2124    protected void addShutdownHook() {
2125        if (useShutdownHook) {
2126            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2127                @Override
2128                public void run() {
2129                    containerShutdown();
2130                }
2131            };
2132            Runtime.getRuntime().addShutdownHook(shutdownHook);
2133        }
2134    }
2135
2136    protected void removeShutdownHook() {
2137        if (shutdownHook != null) {
2138            try {
2139                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2140            } catch (Exception e) {
2141                LOG.debug("Caught exception, must be shutting down: " + e);
2142            }
2143        }
2144    }
2145
2146    /**
2147     * Sets hooks to be executed when broker shut down
2148     *
2149     * @org.apache.xbean.Property
2150     */
2151    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2152        for (Runnable hook : hooks) {
2153            addShutdownHook(hook);
2154        }
2155    }
2156
2157    /**
2158     * Causes a clean shutdown of the container when the VM is being shut down
2159     */
2160    protected void containerShutdown() {
2161        try {
2162            stop();
2163        } catch (IOException e) {
2164            Throwable linkedException = e.getCause();
2165            if (linkedException != null) {
2166                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2167            } else {
2168                logError("Failed to shut down: " + e, e);
2169            }
2170            if (!useLoggingForShutdownErrors) {
2171                e.printStackTrace(System.err);
2172            }
2173        } catch (Exception e) {
2174            logError("Failed to shut down: " + e, e);
2175        }
2176    }
2177
2178    protected void logError(String message, Throwable e) {
2179        if (useLoggingForShutdownErrors) {
2180            LOG.error("Failed to shut down: " + e);
2181        } else {
2182            System.err.println("Failed to shut down: " + e);
2183        }
2184    }
2185
2186    /**
2187     * Starts any configured destinations on startup
2188     */
2189    protected void startDestinations() throws Exception {
2190        if (destinations != null) {
2191            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2192            for (int i = 0; i < destinations.length; i++) {
2193                ActiveMQDestination destination = destinations[i];
2194                getBroker().addDestination(adminConnectionContext, destination,true);
2195            }
2196        }
2197        if (isUseVirtualTopics()) {
2198            startVirtualConsumerDestinations();
2199        }
2200    }
2201
2202    /**
2203     * Returns the broker's administration connection context used for
2204     * configuring the broker at startup
2205     */
2206    public ConnectionContext getAdminConnectionContext() throws Exception {
2207        return BrokerSupport.getConnectionContext(getBroker());
2208    }
2209
2210    protected void waitForSlave() {
2211        try {
2212            if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2213                throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2214            }
2215        } catch (InterruptedException e) {
2216            LOG.error("Exception waiting for slave:" + e);
2217        }
2218    }
2219
2220    protected void slaveConnectionEstablished() {
2221        slaveStartSignal.countDown();
2222    }
2223
2224    protected void startManagementContext() throws Exception {
2225        getManagementContext().start();
2226        adminView = new BrokerView(this, null);
2227        ObjectName objectName = getBrokerObjectName();
2228        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2229    }
2230
2231    /**
2232     * Start all transport and network connections, proxies and bridges
2233     *
2234     * @throws Exception
2235     */
2236    public void startAllConnectors() throws Exception {
2237        if (!isSlave()) {
2238            Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2239            List<TransportConnector> al = new ArrayList<TransportConnector>();
2240            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2241                TransportConnector connector = iter.next();
2242                connector.setBrokerService(this);
2243                al.add(startTransportConnector(connector));
2244            }
2245            if (al.size() > 0) {
2246                // let's clear the transportConnectors list and replace it with
2247                // the started transportConnector instances
2248                this.transportConnectors.clear();
2249                setTransportConnectors(al);
2250            }
2251            URI uri = getVmConnectorURI();
2252            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2253            map.put("network", "true");
2254            map.put("async", "false");
2255            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2256            if (isWaitForSlave()) {
2257                waitForSlave();
2258            }
2259            if (!stopped.get()) {
2260                ThreadPoolExecutor networkConnectorStartExecutor = null;
2261                if (isNetworkConnectorStartAsync()) {
2262                    // spin up as many threads as needed
2263                    networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2264                            10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2265                            new ThreadFactory() {
2266                                int count=0;
2267                                public Thread newThread(Runnable runnable) {
2268                                    Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2269                                    thread.setDaemon(true);
2270                                    return thread;
2271                                }
2272                            });
2273                }
2274
2275                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2276                    final NetworkConnector connector = iter.next();
2277                    connector.setLocalUri(uri);
2278                    connector.setBrokerName(getBrokerName());
2279                    connector.setDurableDestinations(durableDestinations);
2280                    if (getDefaultSocketURIString() != null) {
2281                        connector.setBrokerURL(getDefaultSocketURIString());
2282                    }
2283                    if (networkConnectorStartExecutor != null) {
2284                        networkConnectorStartExecutor.execute(new Runnable() {
2285                            public void run() {
2286                                try {
2287                                    LOG.info("Async start of " + connector);
2288                                    connector.start();
2289                                } catch(Exception e) {
2290                                    LOG.error("Async start of network connector: " + connector + " failed", e);
2291                                }
2292                            }
2293                        });
2294                    } else {
2295                        connector.start();
2296                    }
2297                }
2298                if (networkConnectorStartExecutor != null) {
2299                    // executor done when enqueued tasks are complete
2300                    networkConnectorStartExecutor.shutdown();
2301                    networkConnectorStartExecutor = null;
2302                }
2303
2304                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2305                    ProxyConnector connector = iter.next();
2306                    connector.start();
2307                }
2308                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2309                    JmsConnector connector = iter.next();
2310                    connector.start();
2311                }
2312                for (Service service : services) {
2313                    configureService(service);
2314                    service.start();
2315                }
2316            }
2317        }
2318    }
2319
2320    protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2321        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2322        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2323        if (policy != null) {
2324            connector.setMessageAuthorizationPolicy(policy);
2325        }
2326        if (isUseJmx()) {
2327            connector = registerConnectorMBean(connector);
2328        }
2329        connector.getStatistics().setEnabled(enableStatistics);
2330        connector.start();
2331        return connector;
2332    }
2333
2334    /**
2335     * Perform any custom dependency injection
2336     */
2337    protected void configureServices(Object[] services) {
2338        for (Object service : services) {
2339            configureService(service);
2340        }
2341    }
2342
2343    /**
2344     * Perform any custom dependency injection
2345     */
2346    protected void configureService(Object service) {
2347        if (service instanceof BrokerServiceAware) {
2348            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2349            serviceAware.setBrokerService(this);
2350        }
2351        if (masterConnector == null) {
2352            if (service instanceof MasterConnector) {
2353                masterConnector = (MasterConnector) service;
2354                supportFailOver = true;
2355            }
2356        }
2357    }
2358
2359    public void handleIOException(IOException exception) {
2360        if (ioExceptionHandler != null) {
2361            ioExceptionHandler.handle(exception);
2362         } else {
2363            LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2364         }
2365    }
2366
2367    protected void startVirtualConsumerDestinations() throws Exception {
2368        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2369        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2370        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2371        if (!destinations.isEmpty()) {
2372            for (ActiveMQDestination destination : destinations) {
2373                if (filter.matches(destination) == true) {
2374                    broker.addDestination(adminConnectionContext, destination, false);
2375                }
2376            }
2377        }
2378    }
2379
2380    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2381        // created at startup, so no sync needed
2382        if (virtualConsumerDestinationFilter == null) {
2383            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2384            for (DestinationInterceptor interceptor : destinationInterceptors) {
2385                if (interceptor instanceof VirtualDestinationInterceptor) {
2386                    VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2387                    for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2388                        if (virtualDestination instanceof VirtualTopic) {
2389                            consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2390                        }
2391                    }
2392                }
2393            }
2394            ActiveMQQueue filter = new ActiveMQQueue();
2395            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2396            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2397        }
2398        return virtualConsumerDestinationFilter;
2399    }
2400
2401    protected synchronized ThreadPoolExecutor getExecutor() {
2402        if (this.executor == null) {
2403            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2404
2405                private long i = 0;
2406
2407                @Override
2408                public Thread newThread(Runnable runnable) {
2409                    this.i++;
2410                    Thread thread = new Thread(runnable, "BrokerService.worker." + this.i);
2411                    thread.setDaemon(true);
2412                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2413                        @Override
2414                        public void uncaughtException(final Thread t, final Throwable e) {
2415                            LOG.error("Error in thread '{}'", t.getName(), e);
2416                        }
2417                    });
2418                    return thread;
2419                }
2420            }, new RejectedExecutionHandler() {
2421                @Override
2422                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2423                    try {
2424                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2425                    } catch (InterruptedException e) {
2426                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2427                    }
2428
2429                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2430                }
2431            });
2432        }
2433        return this.executor;
2434    }
2435
2436    public synchronized Scheduler getScheduler() {
2437        if (this.scheduler==null) {
2438            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2439            try {
2440                this.scheduler.start();
2441            } catch (Exception e) {
2442               LOG.error("Failed to start Scheduler ",e);
2443            }
2444        }
2445        return this.scheduler;
2446    }
2447
2448    public Broker getRegionBroker() {
2449        return regionBroker;
2450    }
2451
2452    public void setRegionBroker(Broker regionBroker) {
2453        this.regionBroker = regionBroker;
2454    }
2455
2456    public void addShutdownHook(Runnable hook) {
2457        synchronized (shutdownHooks) {
2458            shutdownHooks.add(hook);
2459        }
2460    }
2461
2462    public void removeShutdownHook(Runnable hook) {
2463        synchronized (shutdownHooks) {
2464            shutdownHooks.remove(hook);
2465        }
2466    }
2467
2468    public boolean isSystemExitOnShutdown() {
2469        return systemExitOnShutdown;
2470    }
2471
2472    /**
2473     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2474     */
2475    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2476        this.systemExitOnShutdown = systemExitOnShutdown;
2477    }
2478
2479    public int getSystemExitOnShutdownExitCode() {
2480        return systemExitOnShutdownExitCode;
2481    }
2482
2483    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2484        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2485    }
2486
2487    public SslContext getSslContext() {
2488        return sslContext;
2489    }
2490
2491    public void setSslContext(SslContext sslContext) {
2492        this.sslContext = sslContext;
2493    }
2494
2495    public boolean isShutdownOnSlaveFailure() {
2496        return shutdownOnSlaveFailure;
2497    }
2498
2499    /**
2500     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2501     */
2502    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2503        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2504    }
2505
2506    public boolean isWaitForSlave() {
2507        return waitForSlave;
2508    }
2509
2510    /**
2511     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2512     */
2513    public void setWaitForSlave(boolean waitForSlave) {
2514        this.waitForSlave = waitForSlave;
2515    }
2516
2517    public long getWaitForSlaveTimeout() {
2518        return this.waitForSlaveTimeout;
2519    }
2520
2521    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2522        this.waitForSlaveTimeout = waitForSlaveTimeout;
2523    }
2524
2525    public CountDownLatch getSlaveStartSignal() {
2526        return slaveStartSignal;
2527    }
2528
2529    /**
2530     * Get the passiveSlave
2531     * @return the passiveSlave
2532     */
2533    public boolean isPassiveSlave() {
2534        return this.passiveSlave;
2535    }
2536
2537    /**
2538     * Set the passiveSlave
2539     * @param passiveSlave the passiveSlave to set
2540     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2541     */
2542    public void setPassiveSlave(boolean passiveSlave) {
2543        this.passiveSlave = passiveSlave;
2544    }
2545
2546    /**
2547     * override the Default IOException handler, called when persistence adapter
2548     * has experiences File or JDBC I/O Exceptions
2549     *
2550     * @param ioExceptionHandler
2551     */
2552    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2553        configureService(ioExceptionHandler);
2554        this.ioExceptionHandler = ioExceptionHandler;
2555    }
2556
2557    public IOExceptionHandler getIoExceptionHandler() {
2558        return ioExceptionHandler;
2559    }
2560
2561    /**
2562     * @return the schedulerSupport
2563     */
2564    public boolean isSchedulerSupport() {
2565        return this.schedulerSupport;
2566    }
2567
2568    /**
2569     * @param schedulerSupport the schedulerSupport to set
2570     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2571     */
2572    public void setSchedulerSupport(boolean schedulerSupport) {
2573        this.schedulerSupport = schedulerSupport;
2574    }
2575
2576    /**
2577     * @return the schedulerDirectory
2578     */
2579    public File getSchedulerDirectoryFile() {
2580        if (this.schedulerDirectoryFile == null) {
2581            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2582        }
2583        return schedulerDirectoryFile;
2584    }
2585
2586    /**
2587     * @param schedulerDirectory the schedulerDirectory to set
2588     */
2589    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2590        this.schedulerDirectoryFile = schedulerDirectory;
2591    }
2592
2593    public void setSchedulerDirectory(String schedulerDirectory) {
2594        setSchedulerDirectoryFile(new File(schedulerDirectory));
2595    }
2596
2597    public int getSchedulePeriodForDestinationPurge() {
2598        return this.schedulePeriodForDestinationPurge;
2599    }
2600
2601    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2602        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2603    }
2604
2605    public int getMaxPurgedDestinationsPerSweep() {
2606        return this.maxPurgedDestinationsPerSweep;
2607    }
2608
2609    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2610        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2611    }
2612
2613    public BrokerContext getBrokerContext() {
2614        return brokerContext;
2615    }
2616
2617    public void setBrokerContext(BrokerContext brokerContext) {
2618        this.brokerContext = brokerContext;
2619    }
2620
2621    public void setBrokerId(String brokerId) {
2622        this.brokerId = new BrokerId(brokerId);
2623    }
2624
2625    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2626        return useAuthenticatedPrincipalForJMSXUserID;
2627    }
2628
2629    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2630        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2631    }
2632
2633    /**
2634     * Should MBeans that support showing the Authenticated User Name information have this
2635     * value filled in or not.
2636     *
2637     * @return true if user names should be exposed in MBeans
2638     */
2639    public boolean isPopulateUserNameInMBeans() {
2640        return this.populateUserNameInMBeans;
2641    }
2642
2643    /**
2644     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2645     * @param true if MBeans should expose user name information.
2646     */
2647    public void setPopulateUserNameInMBeans(boolean value) {
2648        this.populateUserNameInMBeans = value;
2649    }
2650
2651    public boolean isNetworkConnectorStartAsync() {
2652        return networkConnectorStartAsync;
2653    }
2654
2655    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2656        this.networkConnectorStartAsync = networkConnectorStartAsync;
2657    }
2658
2659    public boolean isAllowTempAutoCreationOnSend() {
2660        return allowTempAutoCreationOnSend;
2661    }
2662
2663    /**
2664     * enable if temp destinations need to be propagated through a network when
2665     * advisorySupport==false. This is used in conjunction with the policy
2666     * gcInactiveDestinations for matching temps so they can get removed
2667     * when inactive
2668     *
2669     * @param allowTempAutoCreationOnSend
2670     */
2671    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2672        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2673    }
2674
2675    public int getOfflineDurableSubscriberTimeout() {
2676        return offlineDurableSubscriberTimeout;
2677    }
2678
2679    public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
2680        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2681    }
2682
2683    public int getOfflineDurableSubscriberTaskSchedule() {
2684        return offlineDurableSubscriberTaskSchedule;
2685    }
2686
2687    public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
2688        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2689    }
2690
2691    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2692        return isUseVirtualTopics() && destination.isQueue() &&
2693                getVirtualTopicConsumerDestinationFilter().matches(destination);
2694    }
2695}