001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.sql.Blob;
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027
028import javax.jms.JMSException;
029import javax.sql.rowset.serial.SerialBlob;
030
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.store.jdbc.TransactionContext;
034import org.apache.activemq.util.ByteArrayOutputStream;
035
036/**
037 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
038 * operations. This is a little more involved since to insert a blob you have
039 * to:
040 * 
041 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
042 * value.
043 * 
044 * The databases/JDBC drivers that use this adapter are:
045 * <ul>
046 * <li></li>
047 * </ul>
048 * 
049 * @org.apache.xbean.XBean element="blobJDBCAdapter"
050 * 
051 * 
052 */
053public class BlobJDBCAdapter extends DefaultJDBCAdapter {
054
055    @Override
056    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
057            long expiration, byte priority) throws SQLException, IOException {
058        PreparedStatement s = null;
059        ResultSet rs = null;
060        cleanupExclusiveLock.readLock().lock();
061        try {
062            // Add the Blob record.
063            s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
064            s.setLong(1, sequence);
065            s.setString(2, messageID.getProducerId().toString());
066            s.setLong(3, messageID.getProducerSequenceId());
067            s.setString(4, destination.getQualifiedName());
068            s.setLong(5, expiration);
069            s.setLong(6, priority);
070
071            if (s.executeUpdate() != 1) {
072                throw new IOException("Failed to add broker message: " + messageID + " in container.");
073            }
074            s.close();
075
076            // Select the blob record so that we can update it.
077            s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
078                        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
079            s.setLong(1, sequence);
080            rs = s.executeQuery();
081            if (!rs.next()) {
082                throw new IOException("Failed select blob for message: " + messageID + " in container.");
083            }
084
085            // Update the blob
086            Blob blob = rs.getBlob(1);
087            blob.truncate(0);
088            blob.setBytes(1, data);
089            rs.updateBlob(1, blob);
090            rs.updateRow();             // Update the row with the updated blob
091
092        } finally {
093            cleanupExclusiveLock.readLock().unlock();
094            close(rs);
095            close(s);
096        }
097    }
098
099    @Override
100    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
101        PreparedStatement s = null;
102        ResultSet rs = null;
103        cleanupExclusiveLock.readLock().lock();
104        try {
105
106            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
107            s.setString(1, id.getProducerId().toString());
108            s.setLong(2, id.getProducerSequenceId());
109            rs = s.executeQuery();
110
111            if (!rs.next()) {
112                return null;
113            }
114            Blob blob = rs.getBlob(1);
115            InputStream is = blob.getBinaryStream();
116
117            ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
118            int ch;
119            while ((ch = is.read()) >= 0) {
120                os.write(ch);
121            }
122            is.close();
123            os.close();
124
125            return os.toByteArray();
126
127        } finally {
128            cleanupExclusiveLock.readLock().unlock();
129            close(rs);
130            close(s);
131        }
132    }
133
134}