001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import org.apache.activemq.command.ConsumerId;
020import org.apache.activemq.command.Message;
021import org.apache.activemq.command.MessageId;
022
023/**
024 * Keeps track of a message that is flowing through the Broker. This object may
025 * hold a hard reference to the message or only hold the id of the message if
026 * the message has been persisted on in a MessageStore.
027 * 
028 * 
029 */
030public class IndirectMessageReference implements QueueMessageReference {
031
032    /** The subscription that has locked the message */
033    private LockOwner lockOwner;
034    /** Has the message been dropped? */
035    private boolean dropped;
036    /** Has the message been acked? */
037    private boolean acked;
038    /** Direct reference to the message */
039    private final Message message;
040    
041    /**
042     * @param message
043     */
044    public IndirectMessageReference(final Message message) {
045        this.message = message;
046        message.getMessageId();
047        message.getGroupID();
048        message.getGroupSequence();
049    }
050
051    public Message getMessageHardRef() {
052        return message;
053    }
054
055    public int getReferenceCount() {
056        return message.getReferenceCount();
057    }
058
059    public int incrementReferenceCount() {
060        return message.incrementReferenceCount();
061    }
062
063    public int decrementReferenceCount() {
064        return message.decrementReferenceCount();
065    }
066
067    public Message getMessage() {
068        return message;
069    }
070
071    public String toString() {
072        return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
073    }
074
075    public void incrementRedeliveryCounter() {
076        message.incrementRedeliveryCounter();
077    }
078
079    public synchronized boolean isDropped() {
080        return dropped;
081    }
082
083    public synchronized void drop() {
084        dropped = true;
085        lockOwner = null;
086        message.decrementReferenceCount();
087    }
088
089    public boolean lock(LockOwner subscription) {
090        synchronized (this) {
091            if (dropped || lockOwner != null) {
092                return false;
093            }
094            lockOwner = subscription;
095            return true;
096        }
097    }
098
099    public synchronized boolean unlock() {
100        boolean result = lockOwner != null;
101        lockOwner = null;
102        return result;
103    }
104
105    public synchronized LockOwner getLockOwner() {
106        return lockOwner;
107    }
108
109    public int getRedeliveryCounter() {
110        return message.getRedeliveryCounter();
111    }
112
113    public MessageId getMessageId() {
114        return message.getMessageId();
115    }
116
117    public Destination getRegionDestination() {
118        return message.getRegionDestination();
119    }
120
121    public boolean isPersistent() {
122        return message.isPersistent();
123    }
124
125    public synchronized boolean isLocked() {
126        return lockOwner != null;
127    }
128
129    public synchronized boolean isAcked() {
130        return acked;
131    }
132
133    public synchronized void setAcked(boolean b) {
134        acked = b;
135    }
136
137    public String getGroupID() {
138        return message.getGroupID();
139    }
140
141    public int getGroupSequence() {
142        return message.getGroupSequence();
143    }
144
145    public ConsumerId getTargetConsumerId() {
146        return message.getTargetConsumerId();
147    }
148
149    public long getExpiration() {
150        return message.getExpiration();
151    }
152
153    public boolean isExpired() {
154        return message.isExpired();
155    }
156
157    public synchronized int getSize() {
158       return message.getSize();
159    }
160
161    public boolean isAdvisory() {
162       return message.isAdvisory();
163    }
164}