001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.locks.Lock;
024import java.util.concurrent.locks.ReentrantLock;
025
026import javax.jms.JMSException;
027import javax.jms.ServerSession;
028import javax.jms.ServerSessionPool;
029import javax.jms.Session;
030import javax.resource.spi.UnavailableException;
031import javax.resource.spi.endpoint.MessageEndpoint;
032
033import org.apache.activemq.ActiveMQQueueSession;
034import org.apache.activemq.ActiveMQSession;
035import org.apache.activemq.ActiveMQTopicSession;
036import org.apache.activemq.command.MessageDispatch;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 *  $Date$
042 */
043public class ServerSessionPoolImpl implements ServerSessionPool {
044
045    private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class);
046
047    private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
048    private final int maxSessions;
049
050    private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
051    private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
052    private final Lock sessionLock = new ReentrantLock();
053    private final AtomicBoolean closing = new AtomicBoolean(false);
054
055    public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
056        this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
057        this.maxSessions = maxSessions;
058    }
059
060    private ServerSessionImpl createServerSessionImpl() throws JMSException {
061        MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
062        int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
063        final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
064        MessageEndpoint endpoint;
065        try {
066            int batchSize = 0;
067            if (activationSpec.getEnableBatchBooleanValue()) {
068                batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
069            }
070            if (activationSpec.isUseRAManagedTransactionEnabled()) {
071                // The RA will manage the transaction commit.
072                endpoint = createEndpoint(null);
073                return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
074            } else {
075                // Give the container an object to manage to transaction with.
076                endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
077                return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
078            }
079        } catch (UnavailableException e) {
080            // The container could be limiting us on the number of endpoints
081            // that are being created.
082            if (LOG.isDebugEnabled()) {
083                LOG.debug("Could not create an endpoint.", e);
084            }
085            session.close();
086            return null;
087        }
088    }
089
090    private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
091        MessageEndpoint endpoint;
092        endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
093        MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
094        return endpointProxy;
095    }
096
097    /**
098     */
099    public ServerSession getServerSession() throws JMSException {
100        if (LOG.isDebugEnabled()) {
101            LOG.debug("ServerSession requested.");
102        }
103        if (closing.get()) {
104            throw new JMSException("Session Pool Shutting Down.");
105        }
106        ServerSessionImpl ss = null;
107        sessionLock.lock();
108        try {
109            ss = getExistingServerSession(false);
110        } finally {
111            sessionLock.unlock();
112        }
113        if (ss != null) {
114            return ss;
115        }
116        ss = createServerSessionImpl();
117        sessionLock.lock();
118        try {
119            // We may not be able to create a session due to the container
120            // restricting us.
121            if (ss == null) {
122                if (activeSessions.isEmpty() && idleSessions.isEmpty()) {
123                    throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
124                }
125
126                ss = getExistingServerSession(true);
127            } else {
128                activeSessions.add(ss);
129            }
130        } finally {
131            sessionLock.unlock();
132        }
133        if (LOG.isDebugEnabled()) {
134            LOG.debug("Created a new session: " + ss);
135        }
136        return ss;
137
138    }
139
140    /**
141     * Must be called with sessionLock held.
142     * Returns an idle session if one exists or an active session if no more
143     * sessions can be created.  Sessions can not be created if force is true
144     * or activeSessions >= maxSessions.
145     * @param force do not check activeSessions >= maxSessions, return an active connection anyway.
146     * @return an already existing session.
147     */
148    private ServerSessionImpl getExistingServerSession(boolean force) {
149        ServerSessionImpl ss = null;
150        if (idleSessions.size() > 0) {
151            ss = idleSessions.remove(idleSessions.size() - 1);
152        }
153        if (ss != null) {
154            activeSessions.add(ss);
155            if (LOG.isDebugEnabled()) {
156                LOG.debug("Using idle session: " + ss);
157            }
158        } else if (force || activeSessions.size() >= maxSessions) {
159            // If we are at the upper limit
160            // then reuse the already created sessions..
161            // This is going to queue up messages into a session for
162            // processing.
163            ss = getExistingActiveServerSession();
164        }
165        return ss;
166    }
167
168    /**
169     * Must be called with sessionLock held.
170     * Returns the first session from activeSessions, shifting it to last.
171     * @return session
172     */
173    private ServerSessionImpl getExistingActiveServerSession() {
174        ServerSessionImpl ss = null;
175        if (!activeSessions.isEmpty()) {
176            if (activeSessions.size() > 1) {
177                // round robin
178                ss = activeSessions.remove(0);
179                activeSessions.add(ss);
180            } else {
181                ss = activeSessions.get(0);
182            }
183        }
184        if (LOG.isDebugEnabled()) {
185            LOG.debug("Reusing an active session: " + ss);
186        }
187        return ss;
188    }
189
190    public void returnToPool(ServerSessionImpl ss) {
191        sessionLock.lock();
192            activeSessions.remove(ss);
193        try {
194            // make sure we only return non-stale sessions to the pool
195            if ( ss.isStale() ) {
196                if ( LOG.isDebugEnabled() ) {
197                    LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss);
198                }
199                ss.close();
200            } else {
201                if (LOG.isDebugEnabled()) {
202                    LOG.debug("ServerSession returned to pool: " + ss);
203                }
204            idleSessions.add(ss);
205            }
206        } finally {
207            sessionLock.unlock();
208        }
209        synchronized (closing) {
210            closing.notify();
211        }
212    }
213
214    public void removeFromPool(ServerSessionImpl ss) {
215        sessionLock.lock();
216        try {
217            activeSessions.remove(ss);
218        } finally {
219            sessionLock.unlock();
220        }
221        try {
222            ActiveMQSession session = (ActiveMQSession)ss.getSession();
223            List l = session.getUnconsumedMessages();
224            for (Iterator i = l.iterator(); i.hasNext();) {
225                dispatchToSession((MessageDispatch)i.next());
226            }
227        } catch (Throwable t) {
228            LOG.error("Error redispatching unconsumed messages from stale session", t);
229        }
230        ss.close();
231        synchronized (closing) {
232            closing.notify();
233        }
234    }
235
236    /**
237     * @param messageDispatch
238     *            the message to dispatch
239     * @throws JMSException
240     */
241    private void dispatchToSession(MessageDispatch messageDispatch)
242            throws JMSException {
243
244        ServerSession serverSession = getServerSession();
245        Session s = serverSession.getSession();
246        ActiveMQSession session = null;
247        if (s instanceof ActiveMQSession) {
248            session = (ActiveMQSession) s;
249        } else if (s instanceof ActiveMQQueueSession) {
250            session = (ActiveMQSession) s;
251        } else if (s instanceof ActiveMQTopicSession) {
252            session = (ActiveMQSession) s;
253        } else {
254            activeMQAsfEndpointWorker.getConnection()
255                    .onAsyncException(new JMSException(
256                            "Session pool provided an invalid session type: "
257                                    + s.getClass()));
258        }
259        session.dispatch(messageDispatch);
260        serverSession.start();
261    }
262
263    public void close() {
264        closing.set(true);
265        int activeCount = closeIdleSessions();
266        // we may have to wait erroneously 250ms if an
267        // active session is removed during our wait and we
268        // are not notified
269        while (activeCount > 0) {
270            if (LOG.isDebugEnabled()) {
271                LOG.debug("Active Sessions = " + activeCount);
272            }
273            try {
274                synchronized (closing) {
275                    closing.wait(250);
276                }
277            } catch (InterruptedException e) {
278                Thread.currentThread().interrupt();
279                return;
280            }
281            activeCount = closeIdleSessions();
282        }
283    }
284
285
286    protected int closeIdleSessions() {
287        sessionLock.lock();
288        try {
289            for (ServerSessionImpl ss : idleSessions) {
290                ss.close();
291            }
292            idleSessions.clear();
293            return activeSessions.size();
294        } finally {
295            sessionLock.unlock();
296        }
297    }
298
299    /**
300     * @return Returns the closing.
301     */
302    public boolean isClosing() {
303        return closing.get();
304    }
305
306    /**
307     * @param closing The closing to set.
308     */
309    public void setClosing(boolean closing) {
310        this.closing.set(closing);
311    }
312
313}