001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store; 018 019import java.io.IOException; 020 021import org.apache.activemq.broker.ConnectionContext; 022import org.apache.activemq.command.ActiveMQDestination; 023import org.apache.activemq.command.Message; 024import org.apache.activemq.command.MessageAck; 025import org.apache.activemq.command.MessageId; 026import org.apache.activemq.command.SubscriptionInfo; 027import org.apache.activemq.usage.MemoryUsage; 028 029/** 030 * A simple proxy that delegates to another MessageStore. 031 */ 032public class ProxyTopicMessageStore implements TopicMessageStore { 033 034 final TopicMessageStore delegate; 035 036 public ProxyTopicMessageStore(TopicMessageStore delegate) { 037 this.delegate = delegate; 038 } 039 040 public MessageStore getDelegate() { 041 return delegate; 042 } 043 044 @Override 045 public void addMessage(ConnectionContext context, Message message) throws IOException { 046 delegate.addMessage(context, message); 047 } 048 049 @Override 050 public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 051 delegate.addMessage(context, message, canOptimizeHint); 052 } 053 054 @Override 055 public Message getMessage(MessageId identity) throws IOException { 056 return delegate.getMessage(identity); 057 } 058 059 @Override 060 public void recover(MessageRecoveryListener listener) throws Exception { 061 delegate.recover(listener); 062 } 063 064 @Override 065 public void removeAllMessages(ConnectionContext context) throws IOException { 066 delegate.removeAllMessages(context); 067 } 068 069 @Override 070 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 071 delegate.removeMessage(context, ack); 072 } 073 074 @Override 075 public void start() throws Exception { 076 delegate.start(); 077 } 078 079 @Override 080 public void stop() throws Exception { 081 delegate.stop(); 082 } 083 084 @Override 085 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 086 return delegate.lookupSubscription(clientId, subscriptionName); 087 } 088 089 @Override 090 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 091 MessageId messageId, MessageAck ack) throws IOException { 092 delegate.acknowledge(context, clientId, subscriptionName, messageId, ack); 093 } 094 095 @Override 096 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 097 delegate.addSubscription(subscriptionInfo, retroactive); 098 } 099 100 @Override 101 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 102 delegate.deleteSubscription(clientId, subscriptionName); 103 } 104 105 @Override 106 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 107 throws Exception { 108 delegate.recoverSubscription(clientId, subscriptionName, listener); 109 } 110 111 @Override 112 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 113 MessageRecoveryListener listener) throws Exception { 114 delegate.recoverNextMessages(clientId, subscriptionName, maxReturned, listener); 115 } 116 117 @Override 118 public void resetBatching(String clientId, String subscriptionName) { 119 delegate.resetBatching(clientId, subscriptionName); 120 } 121 122 @Override 123 public ActiveMQDestination getDestination() { 124 return delegate.getDestination(); 125 } 126 127 @Override 128 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 129 return delegate.getAllSubscriptions(); 130 } 131 132 @Override 133 public void setMemoryUsage(MemoryUsage memoryUsage) { 134 delegate.setMemoryUsage(memoryUsage); 135 } 136 137 @Override 138 public int getMessageCount(String clientId, String subscriberName) throws IOException { 139 return delegate.getMessageCount(clientId, subscriberName); 140 } 141 142 @Override 143 public int getMessageCount() throws IOException { 144 return delegate.getMessageCount(); 145 } 146 147 @Override 148 public long getMessageSize() throws IOException { 149 return delegate.getMessageSize(); 150 } 151 152 @Override 153 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 154 delegate.recoverNextMessages(maxReturned, listener); 155 } 156 157 @Override 158 public void dispose(ConnectionContext context) { 159 delegate.dispose(context); 160 } 161 162 @Override 163 public void resetBatching() { 164 delegate.resetBatching(); 165 } 166 167 @Override 168 public void setBatch(MessageId messageId) throws Exception { 169 delegate.setBatch(messageId); 170 } 171 172 @Override 173 public boolean isEmpty() throws Exception { 174 return delegate.isEmpty(); 175 } 176 177 @Override 178 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 179 return delegate.asyncAddTopicMessage(context, message); 180 } 181 182 @Override 183 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 184 return delegate.asyncAddTopicMessage(context,message, canOptimizeHint); 185 } 186 187 @Override 188 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 189 return delegate.asyncAddQueueMessage(context, message); 190 } 191 192 @Override 193 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 194 return delegate.asyncAddQueueMessage(context,message, canOptimizeHint); 195 } 196 197 @Override 198 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 199 delegate.removeAsyncMessage(context, ack); 200 } 201 202 @Override 203 public void setPrioritizedMessages(boolean prioritizedMessages) { 204 delegate.setPrioritizedMessages(prioritizedMessages); 205 } 206 207 @Override 208 public boolean isPrioritizedMessages() { 209 return delegate.isPrioritizedMessages(); 210 } 211 212 @Override 213 public void updateMessage(Message message) throws IOException { 214 delegate.updateMessage(message); 215 } 216 217 @Override 218 public void registerIndexListener(IndexListener indexListener) { 219 delegate.registerIndexListener(indexListener); 220 } 221 222 @Override 223 public MessageStoreStatistics getMessageStoreStatistics() { 224 return delegate.getMessageStoreStatistics(); 225 } 226 227 /* (non-Javadoc) 228 * @see org.apache.activemq.store.TopicMessageStore#getMessageSize(java.lang.String, java.lang.String) 229 */ 230 @Override 231 public long getMessageSize(String clientId, String subscriberName) 232 throws IOException { 233 return delegate.getMessageSize(clientId, subscriberName); 234 } 235 236 @Override 237 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 238 return delegate.getMessageStoreSubStatistics(); 239 } 240}