001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.HashSet; 021 import java.util.Set; 022 import java.util.concurrent.locks.Lock; 023 import java.util.concurrent.locks.ReentrantLock; 024 025 import org.apache.activemq.ActiveMQMessageAudit; 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.command.Message; 029 import org.apache.activemq.command.MessageAck; 030 import org.apache.activemq.command.MessageId; 031 import org.apache.activemq.kaha.MapContainer; 032 import org.apache.activemq.kaha.MessageAckWithLocation; 033 import org.apache.activemq.kaha.StoreEntry; 034 import org.apache.activemq.store.AbstractMessageStore; 035 import org.apache.activemq.store.MessageRecoveryListener; 036 import org.apache.activemq.store.ReferenceStore; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * @author rajdavies 042 * 043 */ 044 public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore { 045 046 private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStore.class); 047 protected final MapContainer<MessageId, ReferenceRecord> messageContainer; 048 protected KahaReferenceStoreAdapter adapter; 049 // keep track of dispatched messages so that duplicate sends that follow a successful 050 // dispatch can be suppressed. 051 protected ActiveMQMessageAudit dispatchAudit = new ActiveMQMessageAudit(); 052 private StoreEntry batchEntry; 053 private String lastBatchId; 054 protected final Lock lock = new ReentrantLock(); 055 056 public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container, 057 ActiveMQDestination destination) throws IOException { 058 super(destination); 059 this.adapter = adapter; 060 this.messageContainer = container; 061 } 062 063 public Lock getStoreLock() { 064 return lock; 065 } 066 067 public void dispose(ConnectionContext context) { 068 super.dispose(context); 069 this.messageContainer.delete(); 070 this.adapter.removeReferenceStore(this); 071 } 072 073 protected MessageId getMessageId(Object object) { 074 return new MessageId(((ReferenceRecord)object).getMessageId()); 075 } 076 077 public void addMessage(ConnectionContext context, Message message) throws IOException { 078 throw new RuntimeException("Use addMessageReference instead"); 079 } 080 081 public Message getMessage(MessageId identity) throws IOException { 082 throw new RuntimeException("Use addMessageReference instead"); 083 } 084 085 protected final boolean recoverReference(MessageRecoveryListener listener, 086 ReferenceRecord record) throws Exception { 087 MessageId id = new MessageId(record.getMessageId()); 088 if (listener.hasSpace()) { 089 return listener.recoverMessageReference(id); 090 } 091 return false; 092 } 093 094 public void recover(MessageRecoveryListener listener) throws Exception { 095 lock.lock(); 096 try { 097 for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer 098 .getNext(entry)) { 099 ReferenceRecord record = messageContainer.getValue(entry); 100 if (!recoverReference(listener, record)) { 101 break; 102 } 103 } 104 }finally { 105 lock.unlock(); 106 } 107 } 108 109 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) 110 throws Exception { 111 lock.lock(); 112 try { 113 StoreEntry entry = batchEntry; 114 if (entry == null) { 115 entry = messageContainer.getFirst(); 116 } else { 117 entry = messageContainer.refresh(entry); 118 if (entry != null) { 119 entry = messageContainer.getNext(entry); 120 } 121 } 122 if (entry != null) { 123 int count = 0; 124 do { 125 ReferenceRecord msg = messageContainer.getValue(entry); 126 if (msg != null ) { 127 if (recoverReference(listener, msg)) { 128 count++; 129 lastBatchId = msg.getMessageId(); 130 } else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) { 131 if (LOG.isDebugEnabled()) { 132 LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId()); 133 } 134 // give usage limits a chance to reclaim 135 break; 136 } else { 137 // skip duplicate and continue 138 if (LOG.isDebugEnabled()) { 139 LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId()); 140 } 141 } 142 } else { 143 lastBatchId = null; 144 } 145 batchEntry = entry; 146 entry = messageContainer.getNext(entry); 147 } while (entry != null && count < maxReturned && listener.hasSpace()); 148 } 149 }finally { 150 lock.unlock(); 151 } 152 } 153 154 public boolean addMessageReference(ConnectionContext context, MessageId messageId, 155 ReferenceData data) throws IOException { 156 157 boolean uniqueueReferenceAdded = false; 158 lock.lock(); 159 try { 160 if (!isDuplicate(messageId)) { 161 ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); 162 messageContainer.put(messageId, record); 163 uniqueueReferenceAdded = true; 164 addInterest(record); 165 if (LOG.isDebugEnabled()) { 166 LOG.debug(destination.getPhysicalName() + " add: " + messageId); 167 } 168 } 169 } finally { 170 lock.unlock(); 171 } 172 return uniqueueReferenceAdded; 173 } 174 175 protected boolean isDuplicate(final MessageId messageId) { 176 boolean duplicate = messageContainer.containsKey(messageId); 177 if (!duplicate) { 178 duplicate = dispatchAudit.isDuplicate(messageId); 179 if (duplicate) { 180 if (LOG.isDebugEnabled()) { 181 LOG.debug(destination.getPhysicalName() 182 + " ignoring duplicated (add) message reference, already dispatched: " 183 + messageId); 184 } 185 } 186 } else if (LOG.isDebugEnabled()) { 187 LOG.debug(destination.getPhysicalName() 188 + " ignoring duplicated (add) message reference, already in store: " + messageId); 189 } 190 return duplicate; 191 } 192 193 public ReferenceData getMessageReference(MessageId identity) throws IOException { 194 lock.lock(); 195 try { 196 ReferenceRecord result = messageContainer.get(identity); 197 if (result == null) { 198 return null; 199 } 200 return result.getData(); 201 }finally { 202 lock.unlock(); 203 } 204 } 205 206 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 207 lock.lock(); 208 try { 209 MessageId msgId = ack.getLastMessageId(); 210 StoreEntry entry = messageContainer.getEntry(msgId); 211 if (entry != null) { 212 ReferenceRecord rr = messageContainer.remove(msgId); 213 if (rr != null) { 214 removeInterest(rr); 215 if (ack instanceof MessageAckWithLocation) { 216 recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId()); 217 } 218 dispatchAudit.isDuplicate(msgId); 219 if (LOG.isDebugEnabled()) { 220 LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId); 221 } 222 if (messageContainer.isEmpty() 223 || (lastBatchId != null && lastBatchId.equals(msgId.toString())) 224 || (batchEntry != null && batchEntry.equals(entry))) { 225 resetBatching(); 226 } 227 } 228 } 229 }finally { 230 lock.unlock(); 231 } 232 } 233 234 private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) { 235 adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId); 236 } 237 238 public void removeAllMessages(ConnectionContext context) throws IOException { 239 lock.lock(); 240 try { 241 Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet()); 242 MessageAck ack = new MessageAck(); 243 for (MessageId id:tmpSet) { 244 ack.setLastMessageId(id); 245 removeMessage(null, ack); 246 } 247 resetBatching(); 248 messageContainer.clear(); 249 }finally { 250 lock.unlock(); 251 } 252 } 253 254 public void delete() { 255 lock.lock(); 256 try { 257 messageContainer.clear(); 258 }finally { 259 lock.unlock(); 260 } 261 } 262 263 public void resetBatching() { 264 lock.lock(); 265 try { 266 batchEntry = null; 267 lastBatchId = null; 268 }finally { 269 lock.unlock(); 270 } 271 } 272 273 public int getMessageCount() { 274 return messageContainer.size(); 275 } 276 277 public boolean isSupportForCursors() { 278 return true; 279 } 280 281 public boolean supportsExternalBatchControl() { 282 return true; 283 } 284 285 void removeInterest(ReferenceRecord rr) { 286 adapter.removeInterestInRecordFile(rr.getData().getFileId()); 287 } 288 289 void addInterest(ReferenceRecord rr) { 290 adapter.addInterestInRecordFile(rr.getData().getFileId()); 291 } 292 293 /** 294 * @param startAfter 295 * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId) 296 */ 297 public void setBatch(MessageId startAfter) { 298 lock.lock(); 299 try { 300 batchEntry = messageContainer.getEntry(startAfter); 301 if (LOG.isDebugEnabled()) { 302 LOG.debug("setBatch: " + startAfter); 303 } 304 } finally { 305 lock.unlock(); 306 } 307 } 308 }