001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.io.PrintStream;
021    import java.sql.Connection;
022    import java.sql.PreparedStatement;
023    import java.sql.ResultSet;
024    import java.sql.ResultSetMetaData;
025    import java.sql.SQLException;
026    import java.sql.Statement;
027    import java.util.ArrayList;
028    import java.util.HashSet;
029    import java.util.LinkedList;
030    import java.util.Set;
031    import java.util.concurrent.locks.ReadWriteLock;
032    import java.util.concurrent.locks.ReentrantReadWriteLock;
033    
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.MessageId;
036    import org.apache.activemq.command.ProducerId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.store.jdbc.JDBCAdapter;
039    import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
040    import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
041    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
042    import org.apache.activemq.store.jdbc.Statements;
043    import org.apache.activemq.store.jdbc.TransactionContext;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
049     * encouraged to override the default implementation of methods to account for differences in JDBC Driver
050     * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
051     * The databases/JDBC drivers that use this adapter are:
052     * <ul>
053     * <li></li>
054     * </ul>
055     * 
056     * @org.apache.xbean.XBean element="defaultJDBCAdapter"
057     * 
058     * 
059     */
060    public class DefaultJDBCAdapter implements JDBCAdapter {
061        private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
062        public static final int MAX_ROWS = 10000;
063        protected Statements statements;
064        protected boolean batchStatments = true;
065        protected boolean prioritizedMessages;
066        protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
067        // needs to be min twice the prefetch for a durable sub and large enough for selector range
068        protected int maxRows = MAX_ROWS;
069    
070        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
071            s.setBytes(index, data);
072        }
073    
074        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
075            return rs.getBytes(index);
076        }
077    
078        public void doCreateTables(TransactionContext c) throws SQLException, IOException {
079            Statement s = null;
080            cleanupExclusiveLock.writeLock().lock();
081            try {
082                // Check to see if the table already exists. If it does, then don't
083                // log warnings during startup.
084                // Need to run the scripts anyways since they may contain ALTER
085                // statements that upgrade a previous version
086                // of the table
087                boolean alreadyExists = false;
088                ResultSet rs = null;
089                try {
090                    rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
091                            new String[] { "TABLE" });
092                    alreadyExists = rs.next();
093                } catch (Throwable ignore) {
094                } finally {
095                    close(rs);
096                }
097                s = c.getConnection().createStatement();
098                String[] createStatments = this.statements.getCreateSchemaStatements();
099                for (int i = 0; i < createStatments.length; i++) {
100                    // This will fail usually since the tables will be
101                    // created already.
102                    try {
103                        LOG.debug("Executing SQL: " + createStatments[i]);
104                        s.execute(createStatments[i]);
105                    } catch (SQLException e) {
106                        if (alreadyExists) {
107                            LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
108                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
109                                    + " Vendor code: " + e.getErrorCode());
110                        } else {
111                            LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
112                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
113                                    + " Vendor code: " + e.getErrorCode());
114                            JDBCPersistenceAdapter.log("Failure details: ", e);
115                        }
116                    }
117                }
118                c.getConnection().commit();
119            } finally {
120                cleanupExclusiveLock.writeLock().unlock();
121                try {
122                    s.close();
123                } catch (Throwable e) {
124                }
125            }
126        }
127    
128        public void doDropTables(TransactionContext c) throws SQLException, IOException {
129            Statement s = null;
130            cleanupExclusiveLock.writeLock().lock();
131            try {
132                s = c.getConnection().createStatement();
133                String[] dropStatments = this.statements.getDropSchemaStatements();
134                for (int i = 0; i < dropStatments.length; i++) {
135                    // This will fail usually since the tables will be
136                    // created already.
137                    try {
138                        LOG.debug("Executing SQL: " + dropStatments[i]);
139                        s.execute(dropStatments[i]);
140                    } catch (SQLException e) {
141                        LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
142                                + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
143                                + e.getErrorCode());
144                        JDBCPersistenceAdapter.log("Failure details: ", e);
145                    }
146                }
147                c.getConnection().commit();
148            } finally {
149                cleanupExclusiveLock.writeLock().unlock();
150                try {
151                    s.close();
152                } catch (Throwable e) {
153                }
154            }
155        }
156    
157        public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
158            PreparedStatement s = null;
159            ResultSet rs = null;
160            cleanupExclusiveLock.readLock().lock();
161            try {
162                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
163                rs = s.executeQuery();
164                long seq1 = 0;
165                if (rs.next()) {
166                    seq1 = rs.getLong(1);
167                }
168                rs.close();
169                s.close();
170                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
171                rs = s.executeQuery();
172                long seq2 = 0;
173                if (rs.next()) {
174                    seq2 = rs.getLong(1);
175                }
176                long seq = Math.max(seq1, seq2);
177                return seq;
178            } finally {
179                cleanupExclusiveLock.readLock().unlock();
180                close(rs);
181                close(s);
182            }
183        }
184        
185        public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
186            PreparedStatement s = null;
187            ResultSet rs = null;
188            cleanupExclusiveLock.readLock().lock();
189            try {
190                s = c.getConnection().prepareStatement(
191                        this.statements.getFindMessageByIdStatement());
192                s.setLong(1, storeSequenceId);
193                rs = s.executeQuery();
194                if (!rs.next()) {
195                    return null;
196                }
197                return getBinaryData(rs, 1);
198            } finally {
199                cleanupExclusiveLock.readLock().unlock();
200                close(rs);
201                close(s);
202            }
203        }
204        
205    
206        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
207                long expiration, byte priority) throws SQLException, IOException {
208            PreparedStatement s = c.getAddMessageStatement();
209            cleanupExclusiveLock.readLock().lock();
210            try {
211                if (s == null) {
212                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
213                    if (this.batchStatments) {
214                        c.setAddMessageStatement(s);
215                    }
216                }
217                s.setLong(1, sequence);
218                s.setString(2, messageID.getProducerId().toString());
219                s.setLong(3, messageID.getProducerSequenceId());
220                s.setString(4, destination.getQualifiedName());
221                s.setLong(5, expiration);
222                s.setLong(6, priority);
223                setBinaryData(s, 7, data);
224                if (this.batchStatments) {
225                    s.addBatch();
226                } else if (s.executeUpdate() != 1) {
227                    throw new SQLException("Failed add a message");
228                }
229            } finally {
230                cleanupExclusiveLock.readLock().unlock();
231                if (!this.batchStatments) {
232                    if (s != null) {
233                        s.close();
234                    }
235                }
236            }
237        }
238    
239        public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
240                long expirationTime, String messageRef) throws SQLException, IOException {
241            PreparedStatement s = c.getAddMessageStatement();
242            cleanupExclusiveLock.readLock().lock();
243            try {
244                if (s == null) {
245                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
246                    if (this.batchStatments) {
247                        c.setAddMessageStatement(s);
248                    }
249                }
250                s.setLong(1, messageID.getBrokerSequenceId());
251                s.setString(2, messageID.getProducerId().toString());
252                s.setLong(3, messageID.getProducerSequenceId());
253                s.setString(4, destination.getQualifiedName());
254                s.setLong(5, expirationTime);
255                s.setString(6, messageRef);
256                if (this.batchStatments) {
257                    s.addBatch();
258                } else if (s.executeUpdate() != 1) {
259                    throw new SQLException("Failed add a message");
260                }
261            } finally {
262                cleanupExclusiveLock.readLock().unlock();
263                if (!this.batchStatments) {
264                    s.close();
265                }
266            }
267        }
268    
269        public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
270            PreparedStatement s = null;
271            ResultSet rs = null;
272            cleanupExclusiveLock.readLock().lock();
273            try {
274                s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
275                s.setString(1, messageID.getProducerId().toString());
276                s.setLong(2, messageID.getProducerSequenceId());
277                s.setString(3, destination.getQualifiedName());
278                rs = s.executeQuery();
279                if (!rs.next()) {
280                    return new long[]{0,0};
281                }
282                return new long[]{rs.getLong(1), rs.getLong(2)};
283            } finally {
284                cleanupExclusiveLock.readLock().unlock();
285                close(rs);
286                close(s);
287            }
288        }
289    
290        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
291            PreparedStatement s = null;
292            ResultSet rs = null;
293            cleanupExclusiveLock.readLock().lock();
294            try {
295                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
296                s.setString(1, id.getProducerId().toString());
297                s.setLong(2, id.getProducerSequenceId());
298                rs = s.executeQuery();
299                if (!rs.next()) {
300                    return null;
301                }
302                return getBinaryData(rs, 1);
303            } finally {
304                cleanupExclusiveLock.readLock().unlock();
305                close(rs);
306                close(s);
307            }
308        }
309    
310        public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
311            PreparedStatement s = null;
312            ResultSet rs = null;
313            cleanupExclusiveLock.readLock().lock();
314            try {
315                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
316                s.setLong(1, seq);
317                rs = s.executeQuery();
318                if (!rs.next()) {
319                    return null;
320                }
321                return rs.getString(1);
322            } finally {
323                cleanupExclusiveLock.readLock().unlock();
324                close(rs);
325                close(s);
326            }
327        }
328    
329        public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
330            PreparedStatement s = c.getRemovedMessageStatement();
331            cleanupExclusiveLock.readLock().lock();
332            try {
333                if (s == null) {
334                    s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
335                    if (this.batchStatments) {
336                        c.setRemovedMessageStatement(s);
337                    }
338                }
339                s.setLong(1, seq);
340                if (this.batchStatments) {
341                    s.addBatch();
342                } else if (s.executeUpdate() != 1) {
343                    throw new SQLException("Failed to remove message");
344                }
345            } finally {
346                cleanupExclusiveLock.readLock().unlock();
347                if (!this.batchStatments && s != null) {
348                    s.close();
349                }
350            }
351        }
352    
353        public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
354                throws Exception {
355            PreparedStatement s = null;
356            ResultSet rs = null;
357            cleanupExclusiveLock.readLock().lock();
358            try {
359                s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
360                s.setString(1, destination.getQualifiedName());
361                rs = s.executeQuery();
362                if (this.statements.isUseExternalMessageReferences()) {
363                    while (rs.next()) {
364                        if (!listener.recoverMessageReference(rs.getString(2))) {
365                            break;
366                        }
367                    }
368                } else {
369                    while (rs.next()) {
370                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
371                            break;
372                        }
373                    }
374                }
375            } finally {
376                cleanupExclusiveLock.readLock().unlock();
377                close(rs);
378                close(s);
379            }
380        }
381    
382        public void doMessageIdScan(TransactionContext c, int limit, 
383                JDBCMessageIdScanListener listener) throws SQLException, IOException {
384            PreparedStatement s = null;
385            ResultSet rs = null;
386            cleanupExclusiveLock.readLock().lock();
387            try {
388                s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
389                s.setMaxRows(limit);
390                rs = s.executeQuery();
391                // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
392                LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
393                while (rs.next()) {
394                    reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
395                }
396                if (LOG.isDebugEnabled()) {
397                    LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
398                }
399                for (MessageId id : reverseOrderIds) {
400                    listener.messageId(id);
401                }
402            } finally {
403                cleanupExclusiveLock.readLock().unlock();
404                close(rs);
405                close(s);
406            }
407        }
408        
409        public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
410                String subscriptionName, long seq, long prio) throws SQLException, IOException {
411            PreparedStatement s = c.getUpdateLastAckStatement();
412            cleanupExclusiveLock.readLock().lock();
413            try {
414                if (s == null) {
415                    s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
416                    if (this.batchStatments) {
417                        c.setUpdateLastAckStatement(s);
418                    }
419                }
420                s.setLong(1, seq);
421                s.setString(2, destination.getQualifiedName());
422                s.setString(3, clientId);
423                s.setString(4, subscriptionName);
424                s.setLong(5, prio);
425                if (this.batchStatments) {
426                    s.addBatch();
427                } else if (s.executeUpdate() != 1) {
428                    throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
429                }
430            } finally {
431                cleanupExclusiveLock.readLock().unlock();
432                if (!this.batchStatments) {
433                    close(s);
434                }
435            }
436        }
437    
438    
439        public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
440                                            String subscriptionName, long seq, long priority) throws SQLException, IOException {
441            PreparedStatement s = c.getUpdateLastAckStatement();
442            cleanupExclusiveLock.readLock().lock();
443            try {
444                if (s == null) {
445                    s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
446                    if (this.batchStatments) {
447                        c.setUpdateLastAckStatement(s);
448                    }
449                }
450                s.setLong(1, seq);
451                s.setString(2, destination.getQualifiedName());
452                s.setString(3, clientId);
453                s.setString(4, subscriptionName);
454    
455                if (this.batchStatments) {
456                    s.addBatch();
457                } else if (s.executeUpdate() != 1) {
458                    throw new IOException("Could not update last ack seq : "
459                                + seq + ", for sub: " + subscriptionName);
460                }
461            } finally {
462                cleanupExclusiveLock.readLock().unlock();
463                if (!this.batchStatments) {
464                    close(s);
465                }            
466            }
467        }
468    
469        public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
470                String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
471            // dumpTables(c,
472            // destination.getQualifiedName(),clientId,subscriptionName);
473            PreparedStatement s = null;
474            ResultSet rs = null;
475            cleanupExclusiveLock.readLock().lock();
476            try {
477                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
478                s.setString(1, destination.getQualifiedName());
479                s.setString(2, clientId);
480                s.setString(3, subscriptionName);
481                rs = s.executeQuery();
482                if (this.statements.isUseExternalMessageReferences()) {
483                    while (rs.next()) {
484                        if (!listener.recoverMessageReference(rs.getString(2))) {
485                            break;
486                        }
487                    }
488                } else {
489                    while (rs.next()) {
490                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
491                            break;
492                        }
493                    }
494                }
495            } finally {
496                cleanupExclusiveLock.readLock().unlock();
497                close(rs);
498                close(s);
499            }
500        }
501    
502        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
503                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
504            
505            PreparedStatement s = null;
506            ResultSet rs = null;
507            cleanupExclusiveLock.readLock().lock();
508            try {
509                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
510                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
511                s.setString(1, destination.getQualifiedName());
512                s.setString(2, clientId);
513                s.setString(3, subscriptionName);
514                s.setLong(4, seq);
515                rs = s.executeQuery();
516                int count = 0;
517                if (this.statements.isUseExternalMessageReferences()) {
518                    while (rs.next() && count < maxReturned) {
519                        if (listener.recoverMessageReference(rs.getString(1))) {
520                            count++;
521                        }
522                    }
523                } else {
524                    while (rs.next() && count < maxReturned) {
525                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
526                            count++;
527                        }
528                    }
529                }
530            } finally {
531                cleanupExclusiveLock.readLock().unlock();
532                close(rs);
533                close(s);
534            }
535        }
536    
537        public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
538                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
539    
540            PreparedStatement s = null;
541            ResultSet rs = null;
542            cleanupExclusiveLock.readLock().lock();
543            try {
544                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
545                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
546                s.setString(1, destination.getQualifiedName());
547                s.setString(2, clientId);
548                s.setString(3, subscriptionName);
549                s.setLong(4, seq);
550                s.setLong(5, priority);
551                rs = s.executeQuery();
552                int count = 0;
553                if (this.statements.isUseExternalMessageReferences()) {
554                    while (rs.next() && count < maxReturned) {
555                        if (listener.recoverMessageReference(rs.getString(1))) {
556                            count++;
557                        }
558                    }
559                } else {
560                    while (rs.next() && count < maxReturned) {
561                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
562                            count++;
563                        }
564                    }
565                }
566            } finally {
567                cleanupExclusiveLock.readLock().unlock();
568                close(rs);
569                close(s);
570            }
571        }
572    
573        public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
574                String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
575            PreparedStatement s = null;
576            ResultSet rs = null;
577            int result = 0;
578            cleanupExclusiveLock.readLock().lock();
579            try {
580                if (isPrioritizedMessages) {
581                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
582                } else {
583                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
584                }
585                s.setString(1, destination.getQualifiedName());
586                s.setString(2, clientId);
587                s.setString(3, subscriptionName);
588                rs = s.executeQuery();
589                if (rs.next()) {
590                    result = rs.getInt(1);
591                }
592            } finally {
593                cleanupExclusiveLock.readLock().unlock();
594                close(rs);
595                close(s);
596            }
597            return result;
598        }
599    
600        /**
601         * @param c 
602         * @param info 
603         * @param retroactive 
604         * @throws SQLException 
605         * @throws IOException 
606         */
607        public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
608                throws SQLException, IOException {
609            // dumpTables(c, destination.getQualifiedName(), clientId,
610            // subscriptionName);
611            PreparedStatement s = null;
612            cleanupExclusiveLock.readLock().lock();
613            try {
614                long lastMessageId = -1;
615                if (!retroactive) {
616                    s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
617                    ResultSet rs = null;
618                    try {
619                        rs = s.executeQuery();
620                        if (rs.next()) {
621                            lastMessageId = rs.getLong(1);
622                        }
623                    } finally {
624                        close(rs);
625                        close(s);
626                    }
627                }
628                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
629                int maxPriority = 1;
630                if (isPrioritizedMessages) {
631                    maxPriority = 10;
632                }
633    
634                for (int priority = 0; priority < maxPriority; priority++) {
635                    s.setString(1, info.getDestination().getQualifiedName());
636                    s.setString(2, info.getClientId());
637                    s.setString(3, info.getSubscriptionName());
638                    s.setString(4, info.getSelector());
639                    s.setLong(5, lastMessageId);
640                    s.setString(6, info.getSubscribedDestination().getQualifiedName());
641                    s.setLong(7, priority);
642    
643                    if (s.executeUpdate() != 1) {
644                        throw new IOException("Could not create durable subscription for: " + info.getClientId());
645                    }
646                }
647    
648            } finally {
649                cleanupExclusiveLock.readLock().unlock();
650                close(s);
651            }
652        }
653    
654        public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
655                String clientId, String subscriptionName) throws SQLException, IOException {
656            PreparedStatement s = null;
657            ResultSet rs = null;
658            cleanupExclusiveLock.readLock().lock();
659            try {
660                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
661                s.setString(1, destination.getQualifiedName());
662                s.setString(2, clientId);
663                s.setString(3, subscriptionName);
664                rs = s.executeQuery();
665                if (!rs.next()) {
666                    return null;
667                }
668                SubscriptionInfo subscription = new SubscriptionInfo();
669                subscription.setDestination(destination);
670                subscription.setClientId(clientId);
671                subscription.setSubscriptionName(subscriptionName);
672                subscription.setSelector(rs.getString(1));
673                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
674                        ActiveMQDestination.QUEUE_TYPE));
675                return subscription;
676            } finally {
677                cleanupExclusiveLock.readLock().unlock();
678                close(rs);
679                close(s);
680            }
681        }
682    
683        public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
684                throws SQLException, IOException {
685            PreparedStatement s = null;
686            ResultSet rs = null;
687            cleanupExclusiveLock.readLock().lock();
688            try {
689                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
690                s.setString(1, destination.getQualifiedName());
691                rs = s.executeQuery();
692                ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
693                while (rs.next()) {
694                    SubscriptionInfo subscription = new SubscriptionInfo();
695                    subscription.setDestination(destination);
696                    subscription.setSelector(rs.getString(1));
697                    subscription.setSubscriptionName(rs.getString(2));
698                    subscription.setClientId(rs.getString(3));
699                    subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
700                            ActiveMQDestination.QUEUE_TYPE));
701                    rc.add(subscription);
702                }
703                return rc.toArray(new SubscriptionInfo[rc.size()]);
704            } finally {
705                cleanupExclusiveLock.readLock().unlock();
706                close(rs);
707                close(s);
708            }
709        }
710    
711        public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
712                IOException {
713            PreparedStatement s = null;
714            cleanupExclusiveLock.readLock().lock();
715            try {
716                s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
717                s.setString(1, destinationName.getQualifiedName());
718                s.executeUpdate();
719                s.close();
720                s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
721                s.setString(1, destinationName.getQualifiedName());
722                s.executeUpdate();
723            } finally {
724                cleanupExclusiveLock.readLock().unlock();
725                close(s);
726            }
727        }
728    
729        public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
730                String subscriptionName) throws SQLException, IOException {
731            PreparedStatement s = null;
732            cleanupExclusiveLock.readLock().lock();
733            try {
734                s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
735                s.setString(1, destination.getQualifiedName());
736                s.setString(2, clientId);
737                s.setString(3, subscriptionName);
738                s.executeUpdate();
739            } finally {
740                cleanupExclusiveLock.readLock().unlock();
741                close(s);
742            }
743        }
744    
745        int priorityIterator = 0;
746        public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
747            PreparedStatement s = null;
748            cleanupExclusiveLock.writeLock().lock();
749            try {
750                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
751                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
752                int priority = priorityIterator++%10;
753                s.setInt(1, priority);
754                s.setInt(2, priority);
755                int i = s.executeUpdate();
756                LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
757            } finally {
758                cleanupExclusiveLock.writeLock().unlock();
759                close(s);
760            }
761        }
762    
763        public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
764                String clientId, String subscriberName) throws SQLException, IOException {
765            PreparedStatement s = null;
766            ResultSet rs = null;
767            long result = -1;
768            cleanupExclusiveLock.readLock().lock();
769            try {
770                s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
771                s.setString(1, destination.getQualifiedName());
772                s.setString(2, clientId);
773                s.setString(3, subscriberName);
774                rs = s.executeQuery();
775                if (rs.next()) {
776                    result = rs.getLong(1);
777                    if (result == 0 && rs.wasNull()) {
778                        result = -1;
779                    }
780                }
781            } finally {
782                cleanupExclusiveLock.readLock().unlock();
783                close(rs);
784                close(s);
785            }
786            return result;
787        }
788    
789        protected static void close(PreparedStatement s) {
790            try {
791                s.close();
792            } catch (Throwable e) {
793            }
794        }
795    
796        protected static void close(ResultSet rs) {
797            try {
798                rs.close();
799            } catch (Throwable e) {
800            }
801        }
802    
803        public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
804            HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
805            PreparedStatement s = null;
806            ResultSet rs = null;
807            cleanupExclusiveLock.readLock().lock();
808            try {
809                s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
810                rs = s.executeQuery();
811                while (rs.next()) {
812                    rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
813                }
814            } finally {
815                cleanupExclusiveLock.readLock().unlock();
816                close(rs);
817                close(s);
818            }
819            return rc;
820        }
821    
822        /**
823         * @return true if batchStements
824         */
825        public boolean isBatchStatments() {
826            return this.batchStatments;
827        }
828    
829        /**
830         * @param batchStatments
831         */
832        public void setBatchStatments(boolean batchStatments) {
833            this.batchStatments = batchStatments;
834        }
835    
836        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
837            this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
838        }
839    
840        /**
841         * @return the statements
842         */
843        public Statements getStatements() {
844            return this.statements;
845        }
846    
847        public void setStatements(Statements statements) {
848            this.statements = statements;
849        }
850    
851        public int getMaxRows() {
852            return maxRows;
853        }
854    
855        public void setMaxRows(int maxRows) {
856            this.maxRows = maxRows;
857        }
858    
859        @Override
860        public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
861            PreparedStatement s = null;
862            cleanupExclusiveLock.readLock().lock();
863            try {
864                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
865                s.setString(1, destination.getQualifiedName());
866                s.setString(2, destination.getQualifiedName());
867                s.setString(3, destination.getQualifiedName());
868                s.setString(4, null);
869                s.setLong(5, 0);
870                s.setString(6, destination.getQualifiedName());
871                s.setLong(7, 11);  // entry out of priority range
872    
873                if (s.executeUpdate() != 1) {
874                    throw new IOException("Could not create ack record for destination: " + destination);
875                }
876            } finally {
877                cleanupExclusiveLock.readLock().unlock();
878                close(s);
879            }
880        }
881    
882        /**
883         * @param c
884         * @param destination
885         * @param clientId
886         * @param subscriberName
887         * @return
888         * @throws SQLException
889         * @throws IOException
890         */
891        public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
892                String clientId, String subscriberName) throws SQLException, IOException {
893            PreparedStatement s = null;
894            ResultSet rs = null;
895            cleanupExclusiveLock.readLock().lock();
896            try {
897                s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
898                s.setString(1, destination.getQualifiedName());
899                s.setString(2, clientId);
900                s.setString(3, subscriberName);
901                rs = s.executeQuery();
902                if (!rs.next()) {
903                    return null;
904                }
905                return getBinaryData(rs, 1);
906            } finally {
907                close(rs);
908                cleanupExclusiveLock.readLock().unlock();
909                close(s);
910            }
911        }
912    
913        public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
914                IOException {
915            PreparedStatement s = null;
916            ResultSet rs = null;
917            int result = 0;
918            cleanupExclusiveLock.readLock().lock();
919            try {
920                s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
921                s.setString(1, destination.getQualifiedName());
922                rs = s.executeQuery();
923                if (rs.next()) {
924                    result = rs.getInt(1);
925                }
926            } finally {
927                cleanupExclusiveLock.readLock().unlock();
928                close(rs);
929                close(s);
930            }
931            return result;
932        }
933    
934        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
935                long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
936            PreparedStatement s = null;
937            ResultSet rs = null;
938            cleanupExclusiveLock.readLock().lock();
939            try {
940                if (isPrioritizedMessages) {
941                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
942                } else {
943                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
944                }
945                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
946                s.setString(1, destination.getQualifiedName());
947                s.setLong(2, nextSeq);
948                if (isPrioritizedMessages) {
949                    s.setLong(3, priority);
950                    s.setLong(4, priority);
951                }
952                rs = s.executeQuery();
953                int count = 0;
954                if (this.statements.isUseExternalMessageReferences()) {
955                    while (rs.next() && count < maxReturned) {
956                        if (listener.recoverMessageReference(rs.getString(1))) {
957                            count++;
958                        } else {
959                            LOG.debug("Stopped recover next messages");
960                            break;
961                        }
962                    }
963                } else {
964                    while (rs.next() && count < maxReturned) {
965                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
966                            count++;
967                        } else {
968                            LOG.debug("Stopped recover next messages");
969                            break;
970                        }
971                    }
972                }
973            } catch (Exception e) {
974                e.printStackTrace();
975            } finally {
976                cleanupExclusiveLock.readLock().unlock();
977                close(rs);
978                close(s);
979            }
980        }
981        
982    /*    public void dumpTables(Connection c, String destinationName, String clientId, String
983          subscriptionName) throws SQLException { 
984            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
985            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
986            PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
987                    + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
988                    + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
989                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
990                    + " ORDER BY M.ID");
991          s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
992          printQuery(s,System.out); }
993    
994        public void dumpTables(Connection c) throws SQLException {
995            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
996            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
997        }
998    
999        private void printQuery(Connection c, String query, PrintStream out)
1000                throws SQLException {
1001            printQuery(c.prepareStatement(query), out);
1002        }
1003    
1004        private void printQuery(PreparedStatement s, PrintStream out)
1005                throws SQLException {
1006    
1007            ResultSet set = null;
1008            try {
1009                set = s.executeQuery();
1010                ResultSetMetaData metaData = set.getMetaData();
1011                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1012                    if (i == 1)
1013                        out.print("||");
1014                    out.print(metaData.getColumnName(i) + "||");
1015                }
1016                out.println();
1017                while (set.next()) {
1018                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
1019                        if (i == 1)
1020                            out.print("|");
1021                        out.print(set.getString(i) + "|");
1022                    }
1023                    out.println();
1024                }
1025            } finally {
1026                try {
1027                    set.close();
1028                } catch (Throwable ignore) {
1029                }
1030                try {
1031                    s.close();
1032                } catch (Throwable ignore) {
1033                }
1034            }
1035        }  */
1036    
1037        public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1038                throws SQLException, IOException {
1039            PreparedStatement s = null;
1040            ResultSet rs = null;
1041            cleanupExclusiveLock.readLock().lock();
1042            try {
1043                s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1044                s.setString(1, id.toString());
1045                rs = s.executeQuery();
1046                long seq = -1;
1047                if (rs.next()) {
1048                    seq = rs.getLong(1);
1049                }
1050                return seq;
1051            } finally {
1052                cleanupExclusiveLock.readLock().unlock();
1053                close(rs);
1054                close(s);
1055            }
1056        }
1057    
1058    }