001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc.adapter; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.io.OutputStream; 022 import java.sql.Blob; 023 import java.sql.Connection; 024 import java.sql.PreparedStatement; 025 import java.sql.ResultSet; 026 import java.sql.SQLException; 027 028 import javax.jms.JMSException; 029 import javax.sql.rowset.serial.SerialBlob; 030 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.MessageId; 033 import org.apache.activemq.store.jdbc.TransactionContext; 034 import org.apache.activemq.util.ByteArrayOutputStream; 035 036 /** 037 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() 038 * operations. This is a little more involved since to insert a blob you have 039 * to: 040 * 041 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data 042 * value. 043 * 044 * The databases/JDBC drivers that use this adapter are: 045 * <ul> 046 * <li></li> 047 * </ul> 048 * 049 * @org.apache.xbean.XBean element="blobJDBCAdapter" 050 * 051 * 052 */ 053 public class BlobJDBCAdapter extends DefaultJDBCAdapter { 054 055 @Override 056 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 057 long expiration, byte priority) throws SQLException, IOException { 058 PreparedStatement s = null; 059 ResultSet rs = null; 060 cleanupExclusiveLock.readLock().lock(); 061 try { 062 // Add the Blob record. 063 s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); 064 s.setLong(1, sequence); 065 s.setString(2, messageID.getProducerId().toString()); 066 s.setLong(3, messageID.getProducerSequenceId()); 067 s.setString(4, destination.getQualifiedName()); 068 s.setLong(5, expiration); 069 s.setLong(6, priority); 070 071 if (s.executeUpdate() != 1) { 072 throw new IOException("Failed to add broker message: " + messageID + " in container."); 073 } 074 s.close(); 075 076 // Select the blob record so that we can update it. 077 s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(), 078 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); 079 s.setLong(1, sequence); 080 rs = s.executeQuery(); 081 if (!rs.next()) { 082 throw new IOException("Failed select blob for message: " + messageID + " in container."); 083 } 084 085 // Update the blob 086 Blob blob = rs.getBlob(1); 087 blob.truncate(0); 088 blob.setBytes(1, data); 089 rs.updateBlob(1, blob); 090 rs.updateRow(); // Update the row with the updated blob 091 092 } finally { 093 cleanupExclusiveLock.readLock().unlock(); 094 close(rs); 095 close(s); 096 } 097 } 098 099 @Override 100 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 101 PreparedStatement s = null; 102 ResultSet rs = null; 103 cleanupExclusiveLock.readLock().lock(); 104 try { 105 106 s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); 107 s.setString(1, id.getProducerId().toString()); 108 s.setLong(2, id.getProducerSequenceId()); 109 rs = s.executeQuery(); 110 111 if (!rs.next()) { 112 return null; 113 } 114 Blob blob = rs.getBlob(1); 115 InputStream is = blob.getBinaryStream(); 116 117 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); 118 int ch; 119 while ((ch = is.read()) >= 0) { 120 os.write(ch); 121 } 122 is.close(); 123 os.close(); 124 125 return os.toByteArray(); 126 127 } finally { 128 cleanupExclusiveLock.readLock().unlock(); 129 close(rs); 130 close(s); 131 } 132 } 133 134 }