001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.DataInputStream; 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.Map.Entry; 027 import org.apache.activemq.broker.ConnectionContext; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQQueue; 030 import org.apache.activemq.command.ActiveMQTempQueue; 031 import org.apache.activemq.command.ActiveMQTempTopic; 032 import org.apache.activemq.command.ActiveMQTopic; 033 import org.apache.activemq.command.Message; 034 import org.apache.activemq.command.MessageAck; 035 import org.apache.activemq.command.MessageId; 036 import org.apache.activemq.command.ProducerId; 037 import org.apache.activemq.command.SubscriptionInfo; 038 import org.apache.activemq.command.TransactionId; 039 import org.apache.activemq.command.XATransactionId; 040 import org.apache.activemq.openwire.OpenWireFormat; 041 import org.apache.activemq.protobuf.Buffer; 042 import org.apache.activemq.store.AbstractMessageStore; 043 import org.apache.activemq.store.MessageRecoveryListener; 044 import org.apache.activemq.store.MessageStore; 045 import org.apache.activemq.store.PersistenceAdapter; 046 import org.apache.activemq.store.TopicMessageStore; 047 import org.apache.activemq.store.TransactionRecoveryListener; 048 import org.apache.activemq.store.TransactionStore; 049 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 050 import org.apache.activemq.store.kahadb.data.KahaDestination; 051 import org.apache.activemq.store.kahadb.data.KahaLocation; 052 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 053 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 054 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 055 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 056 import org.apache.activemq.usage.MemoryUsage; 057 import org.apache.activemq.usage.SystemUsage; 058 import org.apache.activemq.util.ByteSequence; 059 import org.apache.activemq.wireformat.WireFormat; 060 import org.apache.kahadb.journal.Location; 061 import org.apache.kahadb.page.Transaction; 062 063 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter { 064 065 private final WireFormat wireFormat = new OpenWireFormat(); 066 067 public void setBrokerName(String brokerName) { 068 } 069 public void setUsageManager(SystemUsage usageManager) { 070 } 071 072 public TransactionStore createTransactionStore() throws IOException { 073 return new TransactionStore(){ 074 075 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 076 if (preCommit != null) { 077 preCommit.run(); 078 } 079 processCommit(txid); 080 if (postCommit != null) { 081 postCommit.run(); 082 } 083 } 084 public void prepare(TransactionId txid) throws IOException { 085 processPrepare(txid); 086 } 087 public void rollback(TransactionId txid) throws IOException { 088 processRollback(txid); 089 } 090 public void recover(TransactionRecoveryListener listener) throws IOException { 091 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 092 XATransactionId xid = (XATransactionId)entry.getKey(); 093 ArrayList<Message> messageList = new ArrayList<Message>(); 094 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 095 096 for (Operation op : entry.getValue()) { 097 if( op.getClass() == AddOpperation.class ) { 098 AddOpperation addOp = (AddOpperation)op; 099 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 100 messageList.add(msg); 101 } else { 102 RemoveOpperation rmOp = (RemoveOpperation)op; 103 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 104 ackList.add(ack); 105 } 106 } 107 108 Message[] addedMessages = new Message[messageList.size()]; 109 MessageAck[] acks = new MessageAck[ackList.size()]; 110 messageList.toArray(addedMessages); 111 ackList.toArray(acks); 112 listener.recover(xid, addedMessages, acks); 113 } 114 } 115 public void start() throws Exception { 116 } 117 public void stop() throws Exception { 118 } 119 }; 120 } 121 122 public class KahaDBMessageStore extends AbstractMessageStore { 123 protected KahaDestination dest; 124 125 public KahaDBMessageStore(ActiveMQDestination destination) { 126 super(destination); 127 this.dest = convert( destination ); 128 } 129 130 @Override 131 public ActiveMQDestination getDestination() { 132 return destination; 133 } 134 135 public void addMessage(ConnectionContext context, Message message) throws IOException { 136 KahaAddMessageCommand command = new KahaAddMessageCommand(); 137 command.setDestination(dest); 138 command.setMessageId(message.getMessageId().toString()); 139 processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); 140 } 141 142 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 143 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 144 command.setDestination(dest); 145 command.setMessageId(ack.getLastMessageId().toString()); 146 processRemove(command, ack.getTransactionId()); 147 } 148 149 public void removeAllMessages(ConnectionContext context) throws IOException { 150 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 151 command.setDestination(dest); 152 process(command); 153 } 154 155 public Message getMessage(MessageId identity) throws IOException { 156 final String key = identity.toString(); 157 158 // Hopefully one day the page file supports concurrent read operations... but for now we must 159 // externally synchronize... 160 ByteSequence data; 161 synchronized(indexMutex) { 162 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ 163 public ByteSequence execute(Transaction tx) throws IOException { 164 StoredDestination sd = getStoredDestination(dest, tx); 165 Long sequence = sd.messageIdIndex.get(tx, key); 166 if( sequence ==null ) { 167 return null; 168 } 169 return sd.orderIndex.get(tx, sequence).data; 170 } 171 }); 172 } 173 if( data == null ) { 174 return null; 175 } 176 177 Message msg = (Message)wireFormat.unmarshal( data ); 178 return msg; 179 } 180 181 public int getMessageCount() throws IOException { 182 synchronized(indexMutex) { 183 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 184 public Integer execute(Transaction tx) throws IOException { 185 // Iterate through all index entries to get a count of messages in the destination. 186 StoredDestination sd = getStoredDestination(dest, tx); 187 int rc=0; 188 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { 189 iterator.next(); 190 rc++; 191 } 192 return rc; 193 } 194 }); 195 } 196 } 197 198 public void recover(final MessageRecoveryListener listener) throws Exception { 199 synchronized(indexMutex) { 200 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 201 public void execute(Transaction tx) throws Exception { 202 StoredDestination sd = getStoredDestination(dest, tx); 203 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 204 Entry<Long, MessageRecord> entry = iterator.next(); 205 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) ); 206 } 207 } 208 }); 209 } 210 } 211 212 long cursorPos=0; 213 214 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 215 synchronized(indexMutex) { 216 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 217 public void execute(Transaction tx) throws Exception { 218 StoredDestination sd = getStoredDestination(dest, tx); 219 Entry<Long, MessageRecord> entry=null; 220 int counter = 0; 221 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 222 entry = iterator.next(); 223 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 224 counter++; 225 if( counter >= maxReturned ) { 226 break; 227 } 228 } 229 if( entry!=null ) { 230 cursorPos = entry.getKey()+1; 231 } 232 } 233 }); 234 } 235 } 236 237 public void resetBatching() { 238 cursorPos=0; 239 } 240 241 242 @Override 243 public void setBatch(MessageId identity) throws IOException { 244 final String key = identity.toString(); 245 246 // Hopefully one day the page file supports concurrent read operations... but for now we must 247 // externally synchronize... 248 Long location; 249 synchronized(indexMutex) { 250 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 251 public Long execute(Transaction tx) throws IOException { 252 StoredDestination sd = getStoredDestination(dest, tx); 253 return sd.messageIdIndex.get(tx, key); 254 } 255 }); 256 } 257 if( location!=null ) { 258 cursorPos=location+1; 259 } 260 261 } 262 263 @Override 264 public void setMemoryUsage(MemoryUsage memoeyUSage) { 265 } 266 @Override 267 public void start() throws Exception { 268 } 269 @Override 270 public void stop() throws Exception { 271 } 272 273 } 274 275 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 276 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 277 super(destination); 278 } 279 280 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 281 MessageId messageId, MessageAck ack) throws IOException { 282 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 283 command.setDestination(dest); 284 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 285 command.setMessageId(messageId.toString()); 286 // We are not passed a transaction info.. so we can't participate in a transaction. 287 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 288 // to pass back to the XA recover method. 289 // command.setTransactionInfo(); 290 processRemove(command, null); 291 } 292 293 public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 294 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 295 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 296 command.setDestination(dest); 297 command.setSubscriptionKey(subscriptionKey); 298 command.setRetroactive(retroactive); 299 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 300 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 301 process(command); 302 } 303 304 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 305 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 306 command.setDestination(dest); 307 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 308 process(command); 309 } 310 311 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 312 313 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 314 synchronized(indexMutex) { 315 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 316 public void execute(Transaction tx) throws IOException { 317 StoredDestination sd = getStoredDestination(dest, tx); 318 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 319 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 320 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 321 subscriptions.add(info); 322 323 } 324 } 325 }); 326 } 327 328 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 329 subscriptions.toArray(rc); 330 return rc; 331 } 332 333 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 334 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 335 synchronized(indexMutex) { 336 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 337 public SubscriptionInfo execute(Transaction tx) throws IOException { 338 StoredDestination sd = getStoredDestination(dest, tx); 339 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 340 if( command ==null ) { 341 return null; 342 } 343 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 344 } 345 }); 346 } 347 } 348 349 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 350 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 351 synchronized(indexMutex) { 352 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 353 public Integer execute(Transaction tx) throws IOException { 354 StoredDestination sd = getStoredDestination(dest, tx); 355 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 356 if ( cursorPos==null ) { 357 // The subscription might not exist. 358 return 0; 359 } 360 cursorPos += 1; 361 362 int counter = 0; 363 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 364 iterator.next(); 365 counter++; 366 } 367 return counter; 368 } 369 }); 370 } 371 } 372 373 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 374 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 375 synchronized(indexMutex) { 376 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 377 public void execute(Transaction tx) throws Exception { 378 StoredDestination sd = getStoredDestination(dest, tx); 379 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 380 cursorPos += 1; 381 382 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 383 Entry<Long, MessageRecord> entry = iterator.next(); 384 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 385 } 386 } 387 }); 388 } 389 } 390 391 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 392 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 393 synchronized(indexMutex) { 394 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 395 public void execute(Transaction tx) throws Exception { 396 StoredDestination sd = getStoredDestination(dest, tx); 397 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 398 if( cursorPos == null ) { 399 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 400 cursorPos += 1; 401 } 402 403 Entry<Long, MessageRecord> entry=null; 404 int counter = 0; 405 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 406 entry = iterator.next(); 407 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 408 counter++; 409 if( counter >= maxReturned ) { 410 break; 411 } 412 } 413 if( entry!=null ) { 414 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 415 } 416 } 417 }); 418 } 419 } 420 421 public void resetBatching(String clientId, String subscriptionName) { 422 try { 423 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 424 synchronized(indexMutex) { 425 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 426 public void execute(Transaction tx) throws IOException { 427 StoredDestination sd = getStoredDestination(dest, tx); 428 sd.subscriptionCursors.remove(subscriptionKey); 429 } 430 }); 431 } 432 } catch (IOException e) { 433 throw new RuntimeException(e); 434 } 435 } 436 } 437 438 String subscriptionKey(String clientId, String subscriptionName){ 439 return clientId+":"+subscriptionName; 440 } 441 442 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 443 return new KahaDBMessageStore(destination); 444 } 445 446 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 447 return new KahaDBTopicMessageStore(destination); 448 } 449 450 /** 451 * Cleanup method to remove any state associated with the given destination. 452 * This method does not stop the message store (it might not be cached). 453 * 454 * @param destination Destination to forget 455 */ 456 public void removeQueueMessageStore(ActiveMQQueue destination) { 457 } 458 459 /** 460 * Cleanup method to remove any state associated with the given destination 461 * This method does not stop the message store (it might not be cached). 462 * 463 * @param destination Destination to forget 464 */ 465 public void removeTopicMessageStore(ActiveMQTopic destination) { 466 } 467 468 public void deleteAllMessages() throws IOException { 469 } 470 471 472 public Set<ActiveMQDestination> getDestinations() { 473 try { 474 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 475 synchronized(indexMutex) { 476 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 477 public void execute(Transaction tx) throws IOException { 478 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { 479 Entry<String, StoredDestination> entry = iterator.next(); 480 rc.add(convert(entry.getKey())); 481 } 482 } 483 }); 484 } 485 return rc; 486 } catch (IOException e) { 487 throw new RuntimeException(e); 488 } 489 } 490 491 public long getLastMessageBrokerSequenceId() throws IOException { 492 return 0; 493 } 494 495 public long size() { 496 if ( !started.get() ) { 497 return 0; 498 } 499 try { 500 return pageFile.getDiskSize(); 501 } catch (IOException e) { 502 throw new RuntimeException(e); 503 } 504 } 505 506 public void beginTransaction(ConnectionContext context) throws IOException { 507 throw new IOException("Not yet implemented."); 508 } 509 public void commitTransaction(ConnectionContext context) throws IOException { 510 throw new IOException("Not yet implemented."); 511 } 512 public void rollbackTransaction(ConnectionContext context) throws IOException { 513 throw new IOException("Not yet implemented."); 514 } 515 516 public void checkpoint(boolean sync) throws IOException { 517 } 518 519 /////////////////////////////////////////////////////////////////// 520 // Internal conversion methods. 521 /////////////////////////////////////////////////////////////////// 522 523 524 525 KahaLocation convert(Location location) { 526 KahaLocation rc = new KahaLocation(); 527 rc.setLogId(location.getDataFileId()); 528 rc.setOffset(location.getOffset()); 529 return rc; 530 } 531 532 KahaDestination convert(ActiveMQDestination dest) { 533 KahaDestination rc = new KahaDestination(); 534 rc.setName(dest.getPhysicalName()); 535 switch( dest.getDestinationType() ) { 536 case ActiveMQDestination.QUEUE_TYPE: 537 rc.setType(DestinationType.QUEUE); 538 return rc; 539 case ActiveMQDestination.TOPIC_TYPE: 540 rc.setType(DestinationType.TOPIC); 541 return rc; 542 case ActiveMQDestination.TEMP_QUEUE_TYPE: 543 rc.setType(DestinationType.TEMP_QUEUE); 544 return rc; 545 case ActiveMQDestination.TEMP_TOPIC_TYPE: 546 rc.setType(DestinationType.TEMP_TOPIC); 547 return rc; 548 default: 549 return null; 550 } 551 } 552 553 ActiveMQDestination convert(String dest) { 554 int p = dest.indexOf(":"); 555 if( p<0 ) { 556 throw new IllegalArgumentException("Not in the valid destination format"); 557 } 558 int type = Integer.parseInt(dest.substring(0, p)); 559 String name = dest.substring(p+1); 560 561 switch( KahaDestination.DestinationType.valueOf(type) ) { 562 case QUEUE: 563 return new ActiveMQQueue(name); 564 case TOPIC: 565 return new ActiveMQTopic(name); 566 case TEMP_QUEUE: 567 return new ActiveMQTempQueue(name); 568 case TEMP_TOPIC: 569 return new ActiveMQTempTopic(name); 570 default: 571 throw new IllegalArgumentException("Not in the valid destination format"); 572 } 573 } 574 575 public long getLastProducerSequenceId(ProducerId id) { 576 return -1; 577 } 578 579 }