001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc.adapter; 018 019 import java.io.IOException; 020 import java.io.PrintStream; 021 import java.sql.Connection; 022 import java.sql.PreparedStatement; 023 import java.sql.ResultSet; 024 import java.sql.ResultSetMetaData; 025 import java.sql.SQLException; 026 import java.sql.Statement; 027 import java.util.ArrayList; 028 import java.util.HashSet; 029 import java.util.LinkedList; 030 import java.util.Set; 031 import java.util.concurrent.locks.ReadWriteLock; 032 import java.util.concurrent.locks.ReentrantReadWriteLock; 033 034 import org.apache.activemq.command.ActiveMQDestination; 035 import org.apache.activemq.command.MessageId; 036 import org.apache.activemq.command.ProducerId; 037 import org.apache.activemq.command.SubscriptionInfo; 038 import org.apache.activemq.store.jdbc.JDBCAdapter; 039 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 040 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 041 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 042 import org.apache.activemq.store.jdbc.Statements; 043 import org.apache.activemq.store.jdbc.TransactionContext; 044 import org.slf4j.Logger; 045 import org.slf4j.LoggerFactory; 046 047 /** 048 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 049 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 050 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 051 * The databases/JDBC drivers that use this adapter are: 052 * <ul> 053 * <li></li> 054 * </ul> 055 * 056 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 057 * 058 * 059 */ 060 public class DefaultJDBCAdapter implements JDBCAdapter { 061 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); 062 public static final int MAX_ROWS = 10000; 063 protected Statements statements; 064 protected boolean batchStatments = true; 065 protected boolean prioritizedMessages; 066 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); 067 // needs to be min twice the prefetch for a durable sub and large enough for selector range 068 protected int maxRows = MAX_ROWS; 069 070 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 071 s.setBytes(index, data); 072 } 073 074 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 075 return rs.getBytes(index); 076 } 077 078 public void doCreateTables(TransactionContext c) throws SQLException, IOException { 079 Statement s = null; 080 cleanupExclusiveLock.writeLock().lock(); 081 try { 082 // Check to see if the table already exists. If it does, then don't 083 // log warnings during startup. 084 // Need to run the scripts anyways since they may contain ALTER 085 // statements that upgrade a previous version 086 // of the table 087 boolean alreadyExists = false; 088 ResultSet rs = null; 089 try { 090 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), 091 new String[] { "TABLE" }); 092 alreadyExists = rs.next(); 093 } catch (Throwable ignore) { 094 } finally { 095 close(rs); 096 } 097 s = c.getConnection().createStatement(); 098 String[] createStatments = this.statements.getCreateSchemaStatements(); 099 for (int i = 0; i < createStatments.length; i++) { 100 // This will fail usually since the tables will be 101 // created already. 102 try { 103 LOG.debug("Executing SQL: " + createStatments[i]); 104 s.execute(createStatments[i]); 105 } catch (SQLException e) { 106 if (alreadyExists) { 107 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " 108 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 109 + " Vendor code: " + e.getErrorCode()); 110 } else { 111 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " 112 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 113 + " Vendor code: " + e.getErrorCode()); 114 JDBCPersistenceAdapter.log("Failure details: ", e); 115 } 116 } 117 } 118 c.getConnection().commit(); 119 } finally { 120 cleanupExclusiveLock.writeLock().unlock(); 121 try { 122 s.close(); 123 } catch (Throwable e) { 124 } 125 } 126 } 127 128 public void doDropTables(TransactionContext c) throws SQLException, IOException { 129 Statement s = null; 130 cleanupExclusiveLock.writeLock().lock(); 131 try { 132 s = c.getConnection().createStatement(); 133 String[] dropStatments = this.statements.getDropSchemaStatements(); 134 for (int i = 0; i < dropStatments.length; i++) { 135 // This will fail usually since the tables will be 136 // created already. 137 try { 138 LOG.debug("Executing SQL: " + dropStatments[i]); 139 s.execute(dropStatments[i]); 140 } catch (SQLException e) { 141 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 142 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 143 + e.getErrorCode()); 144 JDBCPersistenceAdapter.log("Failure details: ", e); 145 } 146 } 147 c.getConnection().commit(); 148 } finally { 149 cleanupExclusiveLock.writeLock().unlock(); 150 try { 151 s.close(); 152 } catch (Throwable e) { 153 } 154 } 155 } 156 157 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 158 PreparedStatement s = null; 159 ResultSet rs = null; 160 cleanupExclusiveLock.readLock().lock(); 161 try { 162 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 163 rs = s.executeQuery(); 164 long seq1 = 0; 165 if (rs.next()) { 166 seq1 = rs.getLong(1); 167 } 168 rs.close(); 169 s.close(); 170 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 171 rs = s.executeQuery(); 172 long seq2 = 0; 173 if (rs.next()) { 174 seq2 = rs.getLong(1); 175 } 176 long seq = Math.max(seq1, seq2); 177 return seq; 178 } finally { 179 cleanupExclusiveLock.readLock().unlock(); 180 close(rs); 181 close(s); 182 } 183 } 184 185 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 186 PreparedStatement s = null; 187 ResultSet rs = null; 188 cleanupExclusiveLock.readLock().lock(); 189 try { 190 s = c.getConnection().prepareStatement( 191 this.statements.getFindMessageByIdStatement()); 192 s.setLong(1, storeSequenceId); 193 rs = s.executeQuery(); 194 if (!rs.next()) { 195 return null; 196 } 197 return getBinaryData(rs, 1); 198 } finally { 199 cleanupExclusiveLock.readLock().unlock(); 200 close(rs); 201 close(s); 202 } 203 } 204 205 206 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 207 long expiration, byte priority) throws SQLException, IOException { 208 PreparedStatement s = c.getAddMessageStatement(); 209 cleanupExclusiveLock.readLock().lock(); 210 try { 211 if (s == null) { 212 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 213 if (this.batchStatments) { 214 c.setAddMessageStatement(s); 215 } 216 } 217 s.setLong(1, sequence); 218 s.setString(2, messageID.getProducerId().toString()); 219 s.setLong(3, messageID.getProducerSequenceId()); 220 s.setString(4, destination.getQualifiedName()); 221 s.setLong(5, expiration); 222 s.setLong(6, priority); 223 setBinaryData(s, 7, data); 224 if (this.batchStatments) { 225 s.addBatch(); 226 } else if (s.executeUpdate() != 1) { 227 throw new SQLException("Failed add a message"); 228 } 229 } finally { 230 cleanupExclusiveLock.readLock().unlock(); 231 if (!this.batchStatments) { 232 if (s != null) { 233 s.close(); 234 } 235 } 236 } 237 } 238 239 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 240 long expirationTime, String messageRef) throws SQLException, IOException { 241 PreparedStatement s = c.getAddMessageStatement(); 242 cleanupExclusiveLock.readLock().lock(); 243 try { 244 if (s == null) { 245 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 246 if (this.batchStatments) { 247 c.setAddMessageStatement(s); 248 } 249 } 250 s.setLong(1, messageID.getBrokerSequenceId()); 251 s.setString(2, messageID.getProducerId().toString()); 252 s.setLong(3, messageID.getProducerSequenceId()); 253 s.setString(4, destination.getQualifiedName()); 254 s.setLong(5, expirationTime); 255 s.setString(6, messageRef); 256 if (this.batchStatments) { 257 s.addBatch(); 258 } else if (s.executeUpdate() != 1) { 259 throw new SQLException("Failed add a message"); 260 } 261 } finally { 262 cleanupExclusiveLock.readLock().unlock(); 263 if (!this.batchStatments) { 264 s.close(); 265 } 266 } 267 } 268 269 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { 270 PreparedStatement s = null; 271 ResultSet rs = null; 272 cleanupExclusiveLock.readLock().lock(); 273 try { 274 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 275 s.setString(1, messageID.getProducerId().toString()); 276 s.setLong(2, messageID.getProducerSequenceId()); 277 s.setString(3, destination.getQualifiedName()); 278 rs = s.executeQuery(); 279 if (!rs.next()) { 280 return new long[]{0,0}; 281 } 282 return new long[]{rs.getLong(1), rs.getLong(2)}; 283 } finally { 284 cleanupExclusiveLock.readLock().unlock(); 285 close(rs); 286 close(s); 287 } 288 } 289 290 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 291 PreparedStatement s = null; 292 ResultSet rs = null; 293 cleanupExclusiveLock.readLock().lock(); 294 try { 295 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 296 s.setString(1, id.getProducerId().toString()); 297 s.setLong(2, id.getProducerSequenceId()); 298 rs = s.executeQuery(); 299 if (!rs.next()) { 300 return null; 301 } 302 return getBinaryData(rs, 1); 303 } finally { 304 cleanupExclusiveLock.readLock().unlock(); 305 close(rs); 306 close(s); 307 } 308 } 309 310 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 311 PreparedStatement s = null; 312 ResultSet rs = null; 313 cleanupExclusiveLock.readLock().lock(); 314 try { 315 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 316 s.setLong(1, seq); 317 rs = s.executeQuery(); 318 if (!rs.next()) { 319 return null; 320 } 321 return rs.getString(1); 322 } finally { 323 cleanupExclusiveLock.readLock().unlock(); 324 close(rs); 325 close(s); 326 } 327 } 328 329 public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException { 330 PreparedStatement s = c.getRemovedMessageStatement(); 331 cleanupExclusiveLock.readLock().lock(); 332 try { 333 if (s == null) { 334 s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement()); 335 if (this.batchStatments) { 336 c.setRemovedMessageStatement(s); 337 } 338 } 339 s.setLong(1, seq); 340 if (this.batchStatments) { 341 s.addBatch(); 342 } else if (s.executeUpdate() != 1) { 343 throw new SQLException("Failed to remove message"); 344 } 345 } finally { 346 cleanupExclusiveLock.readLock().unlock(); 347 if (!this.batchStatments && s != null) { 348 s.close(); 349 } 350 } 351 } 352 353 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 354 throws Exception { 355 PreparedStatement s = null; 356 ResultSet rs = null; 357 cleanupExclusiveLock.readLock().lock(); 358 try { 359 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 360 s.setString(1, destination.getQualifiedName()); 361 rs = s.executeQuery(); 362 if (this.statements.isUseExternalMessageReferences()) { 363 while (rs.next()) { 364 if (!listener.recoverMessageReference(rs.getString(2))) { 365 break; 366 } 367 } 368 } else { 369 while (rs.next()) { 370 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 371 break; 372 } 373 } 374 } 375 } finally { 376 cleanupExclusiveLock.readLock().unlock(); 377 close(rs); 378 close(s); 379 } 380 } 381 382 public void doMessageIdScan(TransactionContext c, int limit, 383 JDBCMessageIdScanListener listener) throws SQLException, IOException { 384 PreparedStatement s = null; 385 ResultSet rs = null; 386 cleanupExclusiveLock.readLock().lock(); 387 try { 388 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 389 s.setMaxRows(limit); 390 rs = s.executeQuery(); 391 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 392 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 393 while (rs.next()) { 394 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 395 } 396 if (LOG.isDebugEnabled()) { 397 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 398 } 399 for (MessageId id : reverseOrderIds) { 400 listener.messageId(id); 401 } 402 } finally { 403 cleanupExclusiveLock.readLock().unlock(); 404 close(rs); 405 close(s); 406 } 407 } 408 409 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 410 String subscriptionName, long seq, long prio) throws SQLException, IOException { 411 PreparedStatement s = c.getUpdateLastAckStatement(); 412 cleanupExclusiveLock.readLock().lock(); 413 try { 414 if (s == null) { 415 s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement()); 416 if (this.batchStatments) { 417 c.setUpdateLastAckStatement(s); 418 } 419 } 420 s.setLong(1, seq); 421 s.setString(2, destination.getQualifiedName()); 422 s.setString(3, clientId); 423 s.setString(4, subscriptionName); 424 s.setLong(5, prio); 425 if (this.batchStatments) { 426 s.addBatch(); 427 } else if (s.executeUpdate() != 1) { 428 throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName); 429 } 430 } finally { 431 cleanupExclusiveLock.readLock().unlock(); 432 if (!this.batchStatments) { 433 close(s); 434 } 435 } 436 } 437 438 439 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, 440 String subscriptionName, long seq, long priority) throws SQLException, IOException { 441 PreparedStatement s = c.getUpdateLastAckStatement(); 442 cleanupExclusiveLock.readLock().lock(); 443 try { 444 if (s == null) { 445 s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement()); 446 if (this.batchStatments) { 447 c.setUpdateLastAckStatement(s); 448 } 449 } 450 s.setLong(1, seq); 451 s.setString(2, destination.getQualifiedName()); 452 s.setString(3, clientId); 453 s.setString(4, subscriptionName); 454 455 if (this.batchStatments) { 456 s.addBatch(); 457 } else if (s.executeUpdate() != 1) { 458 throw new IOException("Could not update last ack seq : " 459 + seq + ", for sub: " + subscriptionName); 460 } 461 } finally { 462 cleanupExclusiveLock.readLock().unlock(); 463 if (!this.batchStatments) { 464 close(s); 465 } 466 } 467 } 468 469 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 470 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 471 // dumpTables(c, 472 // destination.getQualifiedName(),clientId,subscriptionName); 473 PreparedStatement s = null; 474 ResultSet rs = null; 475 cleanupExclusiveLock.readLock().lock(); 476 try { 477 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 478 s.setString(1, destination.getQualifiedName()); 479 s.setString(2, clientId); 480 s.setString(3, subscriptionName); 481 rs = s.executeQuery(); 482 if (this.statements.isUseExternalMessageReferences()) { 483 while (rs.next()) { 484 if (!listener.recoverMessageReference(rs.getString(2))) { 485 break; 486 } 487 } 488 } else { 489 while (rs.next()) { 490 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 491 break; 492 } 493 } 494 } 495 } finally { 496 cleanupExclusiveLock.readLock().unlock(); 497 close(rs); 498 close(s); 499 } 500 } 501 502 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 503 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 504 505 PreparedStatement s = null; 506 ResultSet rs = null; 507 cleanupExclusiveLock.readLock().lock(); 508 try { 509 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 510 s.setMaxRows(Math.max(maxReturned * 2, maxRows)); 511 s.setString(1, destination.getQualifiedName()); 512 s.setString(2, clientId); 513 s.setString(3, subscriptionName); 514 s.setLong(4, seq); 515 rs = s.executeQuery(); 516 int count = 0; 517 if (this.statements.isUseExternalMessageReferences()) { 518 while (rs.next() && count < maxReturned) { 519 if (listener.recoverMessageReference(rs.getString(1))) { 520 count++; 521 } 522 } 523 } else { 524 while (rs.next() && count < maxReturned) { 525 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 526 count++; 527 } 528 } 529 } 530 } finally { 531 cleanupExclusiveLock.readLock().unlock(); 532 close(rs); 533 close(s); 534 } 535 } 536 537 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 538 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 539 540 PreparedStatement s = null; 541 ResultSet rs = null; 542 cleanupExclusiveLock.readLock().lock(); 543 try { 544 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); 545 s.setMaxRows(Math.max(maxReturned * 2, maxRows)); 546 s.setString(1, destination.getQualifiedName()); 547 s.setString(2, clientId); 548 s.setString(3, subscriptionName); 549 s.setLong(4, seq); 550 s.setLong(5, priority); 551 rs = s.executeQuery(); 552 int count = 0; 553 if (this.statements.isUseExternalMessageReferences()) { 554 while (rs.next() && count < maxReturned) { 555 if (listener.recoverMessageReference(rs.getString(1))) { 556 count++; 557 } 558 } 559 } else { 560 while (rs.next() && count < maxReturned) { 561 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 562 count++; 563 } 564 } 565 } 566 } finally { 567 cleanupExclusiveLock.readLock().unlock(); 568 close(rs); 569 close(s); 570 } 571 } 572 573 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 574 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { 575 PreparedStatement s = null; 576 ResultSet rs = null; 577 int result = 0; 578 cleanupExclusiveLock.readLock().lock(); 579 try { 580 if (isPrioritizedMessages) { 581 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); 582 } else { 583 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 584 } 585 s.setString(1, destination.getQualifiedName()); 586 s.setString(2, clientId); 587 s.setString(3, subscriptionName); 588 rs = s.executeQuery(); 589 if (rs.next()) { 590 result = rs.getInt(1); 591 } 592 } finally { 593 cleanupExclusiveLock.readLock().unlock(); 594 close(rs); 595 close(s); 596 } 597 return result; 598 } 599 600 /** 601 * @param c 602 * @param info 603 * @param retroactive 604 * @throws SQLException 605 * @throws IOException 606 */ 607 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) 608 throws SQLException, IOException { 609 // dumpTables(c, destination.getQualifiedName(), clientId, 610 // subscriptionName); 611 PreparedStatement s = null; 612 cleanupExclusiveLock.readLock().lock(); 613 try { 614 long lastMessageId = -1; 615 if (!retroactive) { 616 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 617 ResultSet rs = null; 618 try { 619 rs = s.executeQuery(); 620 if (rs.next()) { 621 lastMessageId = rs.getLong(1); 622 } 623 } finally { 624 close(rs); 625 close(s); 626 } 627 } 628 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 629 int maxPriority = 1; 630 if (isPrioritizedMessages) { 631 maxPriority = 10; 632 } 633 634 for (int priority = 0; priority < maxPriority; priority++) { 635 s.setString(1, info.getDestination().getQualifiedName()); 636 s.setString(2, info.getClientId()); 637 s.setString(3, info.getSubscriptionName()); 638 s.setString(4, info.getSelector()); 639 s.setLong(5, lastMessageId); 640 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 641 s.setLong(7, priority); 642 643 if (s.executeUpdate() != 1) { 644 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 645 } 646 } 647 648 } finally { 649 cleanupExclusiveLock.readLock().unlock(); 650 close(s); 651 } 652 } 653 654 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 655 String clientId, String subscriptionName) throws SQLException, IOException { 656 PreparedStatement s = null; 657 ResultSet rs = null; 658 cleanupExclusiveLock.readLock().lock(); 659 try { 660 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 661 s.setString(1, destination.getQualifiedName()); 662 s.setString(2, clientId); 663 s.setString(3, subscriptionName); 664 rs = s.executeQuery(); 665 if (!rs.next()) { 666 return null; 667 } 668 SubscriptionInfo subscription = new SubscriptionInfo(); 669 subscription.setDestination(destination); 670 subscription.setClientId(clientId); 671 subscription.setSubscriptionName(subscriptionName); 672 subscription.setSelector(rs.getString(1)); 673 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 674 ActiveMQDestination.QUEUE_TYPE)); 675 return subscription; 676 } finally { 677 cleanupExclusiveLock.readLock().unlock(); 678 close(rs); 679 close(s); 680 } 681 } 682 683 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 684 throws SQLException, IOException { 685 PreparedStatement s = null; 686 ResultSet rs = null; 687 cleanupExclusiveLock.readLock().lock(); 688 try { 689 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 690 s.setString(1, destination.getQualifiedName()); 691 rs = s.executeQuery(); 692 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 693 while (rs.next()) { 694 SubscriptionInfo subscription = new SubscriptionInfo(); 695 subscription.setDestination(destination); 696 subscription.setSelector(rs.getString(1)); 697 subscription.setSubscriptionName(rs.getString(2)); 698 subscription.setClientId(rs.getString(3)); 699 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 700 ActiveMQDestination.QUEUE_TYPE)); 701 rc.add(subscription); 702 } 703 return rc.toArray(new SubscriptionInfo[rc.size()]); 704 } finally { 705 cleanupExclusiveLock.readLock().unlock(); 706 close(rs); 707 close(s); 708 } 709 } 710 711 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 712 IOException { 713 PreparedStatement s = null; 714 cleanupExclusiveLock.readLock().lock(); 715 try { 716 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 717 s.setString(1, destinationName.getQualifiedName()); 718 s.executeUpdate(); 719 s.close(); 720 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 721 s.setString(1, destinationName.getQualifiedName()); 722 s.executeUpdate(); 723 } finally { 724 cleanupExclusiveLock.readLock().unlock(); 725 close(s); 726 } 727 } 728 729 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 730 String subscriptionName) throws SQLException, IOException { 731 PreparedStatement s = null; 732 cleanupExclusiveLock.readLock().lock(); 733 try { 734 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 735 s.setString(1, destination.getQualifiedName()); 736 s.setString(2, clientId); 737 s.setString(3, subscriptionName); 738 s.executeUpdate(); 739 } finally { 740 cleanupExclusiveLock.readLock().unlock(); 741 close(s); 742 } 743 } 744 745 int priorityIterator = 0; 746 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { 747 PreparedStatement s = null; 748 cleanupExclusiveLock.writeLock().lock(); 749 try { 750 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); 751 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); 752 int priority = priorityIterator++%10; 753 s.setInt(1, priority); 754 s.setInt(2, priority); 755 int i = s.executeUpdate(); 756 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority); 757 } finally { 758 cleanupExclusiveLock.writeLock().unlock(); 759 close(s); 760 } 761 } 762 763 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 764 String clientId, String subscriberName) throws SQLException, IOException { 765 PreparedStatement s = null; 766 ResultSet rs = null; 767 long result = -1; 768 cleanupExclusiveLock.readLock().lock(); 769 try { 770 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 771 s.setString(1, destination.getQualifiedName()); 772 s.setString(2, clientId); 773 s.setString(3, subscriberName); 774 rs = s.executeQuery(); 775 if (rs.next()) { 776 result = rs.getLong(1); 777 if (result == 0 && rs.wasNull()) { 778 result = -1; 779 } 780 } 781 } finally { 782 cleanupExclusiveLock.readLock().unlock(); 783 close(rs); 784 close(s); 785 } 786 return result; 787 } 788 789 protected static void close(PreparedStatement s) { 790 try { 791 s.close(); 792 } catch (Throwable e) { 793 } 794 } 795 796 protected static void close(ResultSet rs) { 797 try { 798 rs.close(); 799 } catch (Throwable e) { 800 } 801 } 802 803 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 804 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 805 PreparedStatement s = null; 806 ResultSet rs = null; 807 cleanupExclusiveLock.readLock().lock(); 808 try { 809 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 810 rs = s.executeQuery(); 811 while (rs.next()) { 812 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 813 } 814 } finally { 815 cleanupExclusiveLock.readLock().unlock(); 816 close(rs); 817 close(s); 818 } 819 return rc; 820 } 821 822 /** 823 * @return true if batchStements 824 */ 825 public boolean isBatchStatments() { 826 return this.batchStatments; 827 } 828 829 /** 830 * @param batchStatments 831 */ 832 public void setBatchStatments(boolean batchStatments) { 833 this.batchStatments = batchStatments; 834 } 835 836 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 837 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 838 } 839 840 /** 841 * @return the statements 842 */ 843 public Statements getStatements() { 844 return this.statements; 845 } 846 847 public void setStatements(Statements statements) { 848 this.statements = statements; 849 } 850 851 public int getMaxRows() { 852 return maxRows; 853 } 854 855 public void setMaxRows(int maxRows) { 856 this.maxRows = maxRows; 857 } 858 859 @Override 860 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { 861 PreparedStatement s = null; 862 cleanupExclusiveLock.readLock().lock(); 863 try { 864 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 865 s.setString(1, destination.getQualifiedName()); 866 s.setString(2, destination.getQualifiedName()); 867 s.setString(3, destination.getQualifiedName()); 868 s.setString(4, null); 869 s.setLong(5, 0); 870 s.setString(6, destination.getQualifiedName()); 871 s.setLong(7, 11); // entry out of priority range 872 873 if (s.executeUpdate() != 1) { 874 throw new IOException("Could not create ack record for destination: " + destination); 875 } 876 } finally { 877 cleanupExclusiveLock.readLock().unlock(); 878 close(s); 879 } 880 } 881 882 /** 883 * @param c 884 * @param destination 885 * @param clientId 886 * @param subscriberName 887 * @return 888 * @throws SQLException 889 * @throws IOException 890 */ 891 public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination, 892 String clientId, String subscriberName) throws SQLException, IOException { 893 PreparedStatement s = null; 894 ResultSet rs = null; 895 cleanupExclusiveLock.readLock().lock(); 896 try { 897 s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement()); 898 s.setString(1, destination.getQualifiedName()); 899 s.setString(2, clientId); 900 s.setString(3, subscriberName); 901 rs = s.executeQuery(); 902 if (!rs.next()) { 903 return null; 904 } 905 return getBinaryData(rs, 1); 906 } finally { 907 close(rs); 908 cleanupExclusiveLock.readLock().unlock(); 909 close(s); 910 } 911 } 912 913 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 914 IOException { 915 PreparedStatement s = null; 916 ResultSet rs = null; 917 int result = 0; 918 cleanupExclusiveLock.readLock().lock(); 919 try { 920 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 921 s.setString(1, destination.getQualifiedName()); 922 rs = s.executeQuery(); 923 if (rs.next()) { 924 result = rs.getInt(1); 925 } 926 } finally { 927 cleanupExclusiveLock.readLock().unlock(); 928 close(rs); 929 close(s); 930 } 931 return result; 932 } 933 934 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, 935 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { 936 PreparedStatement s = null; 937 ResultSet rs = null; 938 cleanupExclusiveLock.readLock().lock(); 939 try { 940 if (isPrioritizedMessages) { 941 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); 942 } else { 943 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); 944 } 945 s.setMaxRows(Math.max(maxReturned * 2, maxRows)); 946 s.setString(1, destination.getQualifiedName()); 947 s.setLong(2, nextSeq); 948 if (isPrioritizedMessages) { 949 s.setLong(3, priority); 950 s.setLong(4, priority); 951 } 952 rs = s.executeQuery(); 953 int count = 0; 954 if (this.statements.isUseExternalMessageReferences()) { 955 while (rs.next() && count < maxReturned) { 956 if (listener.recoverMessageReference(rs.getString(1))) { 957 count++; 958 } else { 959 LOG.debug("Stopped recover next messages"); 960 break; 961 } 962 } 963 } else { 964 while (rs.next() && count < maxReturned) { 965 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 966 count++; 967 } else { 968 LOG.debug("Stopped recover next messages"); 969 break; 970 } 971 } 972 } 973 } catch (Exception e) { 974 e.printStackTrace(); 975 } finally { 976 cleanupExclusiveLock.readLock().unlock(); 977 close(rs); 978 close(s); 979 } 980 } 981 982 /* public void dumpTables(Connection c, String destinationName, String clientId, String 983 subscriptionName) throws SQLException { 984 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 985 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 986 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 987 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 988 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 989 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 990 + " ORDER BY M.ID"); 991 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 992 printQuery(s,System.out); } 993 994 public void dumpTables(Connection c) throws SQLException { 995 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 996 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 997 } 998 999 private void printQuery(Connection c, String query, PrintStream out) 1000 throws SQLException { 1001 printQuery(c.prepareStatement(query), out); 1002 } 1003 1004 private void printQuery(PreparedStatement s, PrintStream out) 1005 throws SQLException { 1006 1007 ResultSet set = null; 1008 try { 1009 set = s.executeQuery(); 1010 ResultSetMetaData metaData = set.getMetaData(); 1011 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1012 if (i == 1) 1013 out.print("||"); 1014 out.print(metaData.getColumnName(i) + "||"); 1015 } 1016 out.println(); 1017 while (set.next()) { 1018 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1019 if (i == 1) 1020 out.print("|"); 1021 out.print(set.getString(i) + "|"); 1022 } 1023 out.println(); 1024 } 1025 } finally { 1026 try { 1027 set.close(); 1028 } catch (Throwable ignore) { 1029 } 1030 try { 1031 s.close(); 1032 } catch (Throwable ignore) { 1033 } 1034 } 1035 } */ 1036 1037 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) 1038 throws SQLException, IOException { 1039 PreparedStatement s = null; 1040 ResultSet rs = null; 1041 cleanupExclusiveLock.readLock().lock(); 1042 try { 1043 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); 1044 s.setString(1, id.toString()); 1045 rs = s.executeQuery(); 1046 long seq = -1; 1047 if (rs.next()) { 1048 seq = rs.getLong(1); 1049 } 1050 return seq; 1051 } finally { 1052 cleanupExclusiveLock.readLock().unlock(); 1053 close(rs); 1054 close(s); 1055 } 1056 } 1057 1058 }