001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.IOException;
020import java.util.HashSet;
021import java.util.Set;
022import java.util.concurrent.locks.Lock;
023import java.util.concurrent.locks.ReentrantLock;
024
025import org.apache.activemq.ActiveMQMessageAudit;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.Message;
029import org.apache.activemq.command.MessageAck;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.kaha.MapContainer;
032import org.apache.activemq.kaha.MessageAckWithLocation;
033import org.apache.activemq.kaha.StoreEntry;
034import org.apache.activemq.store.AbstractMessageStore;
035import org.apache.activemq.store.MessageRecoveryListener;
036import org.apache.activemq.store.ReferenceStore;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * @author rajdavies
042 *
043 */
044public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
045
046    private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStore.class);
047    protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
048    protected KahaReferenceStoreAdapter adapter;
049    // keep track of dispatched messages so that duplicate sends that follow a successful
050    // dispatch can be suppressed.
051    protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit();
052    private StoreEntry batchEntry;
053    private String lastBatchId;
054    protected final Lock lock = new ReentrantLock();
055
056    public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
057                              ActiveMQDestination destination) throws IOException {
058        super(destination);
059        this.adapter = adapter;
060        this.messageContainer = container;
061    }
062    
063    public Lock getStoreLock() {
064        return lock;
065    }
066
067    public void dispose(ConnectionContext context) {
068        super.dispose(context);
069        this.messageContainer.delete();
070        this.adapter.removeReferenceStore(this);
071    }
072
073    protected MessageId getMessageId(Object object) {
074        return new MessageId(((ReferenceRecord)object).getMessageId());
075    }
076
077    public void addMessage(ConnectionContext context, Message message) throws IOException {
078        throw new RuntimeException("Use addMessageReference instead");
079    }
080
081    public Message getMessage(MessageId identity) throws IOException {
082        throw new RuntimeException("Use addMessageReference instead");
083    }
084
085    protected final boolean recoverReference(MessageRecoveryListener listener,
086            ReferenceRecord record) throws Exception {
087        MessageId id = new MessageId(record.getMessageId());
088        if (listener.hasSpace()) {
089            return listener.recoverMessageReference(id);
090        }
091        return false;
092    }
093
094    public void recover(MessageRecoveryListener listener) throws Exception {
095        lock.lock();
096        try {
097            for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
098                .getNext(entry)) {
099                ReferenceRecord record = messageContainer.getValue(entry);
100                if (!recoverReference(listener, record)) {
101                    break;
102                }
103            }
104        }finally {
105            lock.unlock();
106        }
107    }
108
109    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
110        throws Exception {
111        lock.lock();
112        try {
113            StoreEntry entry = batchEntry;
114            if (entry == null) {
115                entry = messageContainer.getFirst();
116            } else {
117                entry = messageContainer.refresh(entry);
118                if (entry != null) {
119                    entry = messageContainer.getNext(entry);
120                }
121            }
122            if (entry != null) {      
123                int count = 0;
124                do {
125                    ReferenceRecord msg = messageContainer.getValue(entry);
126                    if (msg != null ) {
127                        if (recoverReference(listener, msg)) {
128                            count++;
129                            lastBatchId = msg.getMessageId();
130                        } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
131                            if (LOG.isDebugEnabled()) {
132                                LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
133                            }
134                            // give usage limits a chance to reclaim
135                            break;
136                        } else {
137                            // skip duplicate and continue
138                            if (LOG.isDebugEnabled()) {
139                                LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
140                            }
141                        }                        
142                    } else {
143                        lastBatchId = null;
144                    }
145                    batchEntry = entry;
146                    entry = messageContainer.getNext(entry);
147                } while (entry != null && count < maxReturned && listener.hasSpace());
148            }
149        }finally {
150            lock.unlock();
151        }
152    }
153
154    public boolean addMessageReference(ConnectionContext context, MessageId messageId,
155                                                 ReferenceData data) throws IOException {
156        
157        boolean uniqueueReferenceAdded = false;
158        lock.lock();
159        try {
160            if (!isDuplicate(messageId)) {
161                ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
162                messageContainer.put(messageId, record);
163                uniqueueReferenceAdded = true;
164                addInterest(record);
165                if (LOG.isDebugEnabled()) {
166                    LOG.debug(destination.getPhysicalName() + " add: " + messageId);
167                }
168            }
169        } finally {
170            lock.unlock();
171        }
172        return uniqueueReferenceAdded;
173    }
174
175    protected boolean isDuplicate(final MessageId messageId) {
176        boolean duplicate = messageContainer.containsKey(messageId);
177        if (!duplicate) {
178            duplicate = dispatchAudit.isDuplicate(messageId);
179            if (duplicate) {
180                if (LOG.isDebugEnabled()) {
181                    LOG.debug(destination.getPhysicalName()
182                        + " ignoring duplicated (add) message reference, already dispatched: "
183                        + messageId);
184                }
185            }
186        } else if (LOG.isDebugEnabled()) {
187            LOG.debug(destination.getPhysicalName()
188                    + " ignoring duplicated (add) message reference, already in store: " + messageId);
189        }
190        return duplicate;
191    }
192    
193    public ReferenceData getMessageReference(MessageId identity) throws IOException {
194        lock.lock();
195        try {
196            ReferenceRecord result = messageContainer.get(identity);
197            if (result == null) {
198                return null;
199            }
200            return result.getData();
201        }finally {
202            lock.unlock();
203        }
204    }
205
206    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
207        lock.lock();
208        try {
209            MessageId msgId = ack.getLastMessageId();
210            StoreEntry entry = messageContainer.getEntry(msgId);
211            if (entry != null) {
212                ReferenceRecord rr = messageContainer.remove(msgId);
213                if (rr != null) {
214                    removeInterest(rr);
215                    if (ack instanceof MessageAckWithLocation) {
216                        recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
217                    }
218                    dispatchAudit.isDuplicate(msgId);
219                    if (LOG.isDebugEnabled()) {
220                        LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
221                    }
222                    if (messageContainer.isEmpty()
223                        || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
224                        || (batchEntry != null && batchEntry.equals(entry))) {
225                        resetBatching();
226                    }
227                }
228            }
229        }finally {
230            lock.unlock();
231        }
232    }
233
234    private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
235        adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
236    }
237
238    public void removeAllMessages(ConnectionContext context) throws IOException {
239        lock.lock();
240        try {
241            Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
242            MessageAck ack = new MessageAck();
243            for (MessageId id:tmpSet) {
244                ack.setLastMessageId(id);
245                removeMessage(null, ack);
246            }
247            resetBatching();
248            messageContainer.clear();
249        }finally {
250            lock.unlock();
251        }
252    }
253
254    public void delete() {
255        lock.lock();
256        try {
257            messageContainer.clear();
258        }finally {
259            lock.unlock();
260        }
261    }
262
263    public void resetBatching() {
264        lock.lock();
265        try {
266            batchEntry = null;
267            lastBatchId = null;
268        }finally {
269            lock.unlock();
270        }
271    }
272
273    public int getMessageCount() {
274        return messageContainer.size();
275    }
276
277    public boolean isSupportForCursors() {
278        return true;
279    }
280
281    public boolean supportsExternalBatchControl() {
282        return true;
283    }
284
285    void removeInterest(ReferenceRecord rr) {
286        adapter.removeInterestInRecordFile(rr.getData().getFileId());
287    }
288
289    void addInterest(ReferenceRecord rr) {
290        adapter.addInterestInRecordFile(rr.getData().getFileId());
291    }
292
293    /**
294     * @param startAfter
295     * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
296     */
297    public void setBatch(MessageId startAfter) {
298        lock.lock();
299        try {
300            batchEntry = messageContainer.getEntry(startAfter);
301            if (LOG.isDebugEnabled()) {
302                LOG.debug("setBatch: " + startAfter);
303            }
304        } finally {
305            lock.unlock();
306        }
307    }
308}