001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021 022import javax.jms.InvalidSelectorException; 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageDispatchNotification; 030import org.apache.activemq.command.MessagePull; 031import org.apache.activemq.command.Response; 032import org.apache.activemq.filter.MessageEvaluationContext; 033 034/** 035 * 036 */ 037public interface Subscription extends SubscriptionRecovery { 038 039 /** 040 * Used to add messages that match the subscription. 041 * @param node 042 * @throws Exception 043 * @throws InterruptedException 044 * @throws IOException 045 */ 046 void add(MessageReference node) throws Exception; 047 048 /** 049 * Used when client acknowledge receipt of dispatched message. 050 * @param node 051 * @throws IOException 052 * @throws Exception 053 */ 054 void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception; 055 056 057 /** 058 * Allows a consumer to pull a message on demand 059 */ 060 Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception; 061 062 /** 063 * Is the subscription interested in the message? 064 * @param node 065 * @param context 066 * @return 067 * @throws IOException 068 */ 069 boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; 070 071 /** 072 * Is the subscription interested in messages in the destination? 073 * @param context 074 * @return 075 */ 076 boolean matches(ActiveMQDestination destination); 077 078 /** 079 * The subscription will be receiving messages from the destination. 080 * @param context 081 * @param destination 082 * @throws Exception 083 */ 084 void add(ConnectionContext context, Destination destination) throws Exception; 085 086 /** 087 * The subscription will be no longer be receiving messages from the destination. 088 * @param context 089 * @param destination 090 * @return a list of un-acked messages that were added to the subscription. 091 */ 092 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; 093 094 /** 095 * The ConsumerInfo object that created the subscription. 096 * @param destination 097 */ 098 ConsumerInfo getConsumerInfo(); 099 100 /** 101 * The subscription should release as may references as it can to help the garbage collector 102 * reclaim memory. 103 */ 104 void gc(); 105 106 /** 107 * Used by a Slave Broker to update dispatch infomation 108 * @param mdn 109 * @throws Exception 110 */ 111 void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; 112 113 /** 114 * @return true if the broker is currently in slave mode 115 */ 116 boolean isSlave(); 117 118 /** 119 * @return number of messages pending delivery 120 */ 121 int getPendingQueueSize(); 122 123 /** 124 * @return number of messages dispatched to the client 125 */ 126 int getDispatchedQueueSize(); 127 128 /** 129 * @return number of messages dispatched to the client 130 */ 131 long getDispatchedCounter(); 132 133 /** 134 * @return number of messages that matched the subscription 135 */ 136 long getEnqueueCounter(); 137 138 /** 139 * @return number of messages queued by the client 140 */ 141 long getDequeueCounter(); 142 143 /** 144 * @return the JMS selector on the current subscription 145 */ 146 String getSelector(); 147 148 /** 149 * Attempts to change the current active selector on the subscription. 150 * This operation is not supported for persistent topics. 151 */ 152 void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException; 153 154 /** 155 * @return the JMX object name that this subscription was registered as if applicable 156 */ 157 ObjectName getObjectName(); 158 159 /** 160 * Set when the subscription is registered in JMX 161 */ 162 void setObjectName(ObjectName objectName); 163 164 /** 165 * @return true when 60% or more room is left for dispatching messages 166 */ 167 boolean isLowWaterMark(); 168 169 /** 170 * @return true when 10% or less room is left for dispatching messages 171 */ 172 boolean isHighWaterMark(); 173 174 /** 175 * @return true if there is no space to dispatch messages 176 */ 177 boolean isFull(); 178 179 /** 180 * inform the MessageConsumer on the client to change it's prefetch 181 * @param newPrefetch 182 */ 183 void updateConsumerPrefetch(int newPrefetch); 184 185 186 /** 187 * Called when the subscription is destroyed. 188 */ 189 void destroy(); 190 191 /** 192 * @return the prefetch size that is configured for the subscription 193 */ 194 int getPrefetchSize(); 195 196 /** 197 * @return the number of messages awaiting acknowledgement 198 */ 199 int getInFlightSize(); 200 201 /** 202 * @return the in flight messages as a percentage of the prefetch size 203 */ 204 int getInFlightUsage(); 205 206 /** 207 * Informs the Broker if the subscription needs to intervention to recover it's state 208 * e.g. DurableTopicSubscriber may do 209 * @see org.apache.activemq.region.cursors.PendingMessageCursor 210 * @return true if recovery required 211 */ 212 boolean isRecoveryRequired(); 213 214 215 /** 216 * @return true if a browser 217 */ 218 boolean isBrowser(); 219 220 /** 221 * @return the number of messages this subscription can accept before its full 222 */ 223 int countBeforeFull(); 224 225 ConnectionContext getContext(); 226 227 public int getCursorMemoryHighWaterMark(); 228 229 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); 230 231 boolean isSlowConsumer(); 232 233 void unmatched(MessageReference node) throws IOException; 234}