001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.jdbc; 018 019 import java.io.IOException; 020 import java.sql.SQLException; 021 import java.util.concurrent.atomic.AtomicLong; 022 023 import org.apache.activemq.ActiveMQMessageAudit; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.command.ActiveMQDestination; 026 import org.apache.activemq.command.Message; 027 import org.apache.activemq.command.MessageAck; 028 import org.apache.activemq.command.MessageId; 029 import org.apache.activemq.store.AbstractMessageStore; 030 import org.apache.activemq.store.MessageRecoveryListener; 031 import org.apache.activemq.util.ByteSequence; 032 import org.apache.activemq.util.ByteSequenceData; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.wireformat.WireFormat; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * 040 */ 041 public class JDBCMessageStore extends AbstractMessageStore { 042 043 class Duration { 044 static final int LIMIT = 100; 045 final long start = System.currentTimeMillis(); 046 final String name; 047 048 Duration(String name) { 049 this.name = name; 050 } 051 void end() { 052 end(null); 053 } 054 void end(Object o) { 055 long duration = System.currentTimeMillis() - start; 056 057 if (duration > LIMIT) { 058 System.err.println(name + " took a long time: " + duration + "ms " + o); 059 } 060 } 061 } 062 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); 063 protected final WireFormat wireFormat; 064 protected final JDBCAdapter adapter; 065 protected final JDBCPersistenceAdapter persistenceAdapter; 066 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); 067 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); 068 069 protected ActiveMQMessageAudit audit; 070 071 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { 072 super(destination); 073 this.persistenceAdapter = persistenceAdapter; 074 this.adapter = adapter; 075 this.wireFormat = wireFormat; 076 this.audit = audit; 077 078 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { 079 recordDestinationCreation(destination); 080 } 081 } 082 083 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { 084 TransactionContext c = persistenceAdapter.getTransactionContext(); 085 try { 086 c = persistenceAdapter.getTransactionContext(); 087 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { 088 adapter.doRecordDestination(c, destination); 089 } 090 } catch (SQLException e) { 091 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 092 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); 093 } finally { 094 c.close(); 095 } 096 } 097 098 public void addMessage(ConnectionContext context, Message message) throws IOException { 099 MessageId messageId = message.getMessageId(); 100 if (audit != null && audit.isDuplicate(message)) { 101 if (LOG.isDebugEnabled()) { 102 LOG.debug(destination.getPhysicalName() 103 + " ignoring duplicated (add) message, already stored: " 104 + messageId); 105 } 106 return; 107 } 108 109 long sequenceId = persistenceAdapter.getNextSequenceId(); 110 111 // Serialize the Message.. 112 byte data[]; 113 try { 114 ByteSequence packet = wireFormat.marshal(message); 115 data = ByteSequenceData.toByteArray(packet); 116 } catch (IOException e) { 117 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 118 } 119 120 // Get a connection and insert the message into the DB. 121 TransactionContext c = persistenceAdapter.getTransactionContext(context); 122 try { 123 adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), 124 this.isPrioritizedMessages() ? message.getPriority() : 0); 125 } catch (SQLException e) { 126 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 127 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 128 } finally { 129 c.close(); 130 } 131 onAdd(messageId, sequenceId, message.getPriority()); 132 } 133 134 protected void onAdd(MessageId messageId, long sequenceId, byte priority) { 135 } 136 137 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 138 // Get a connection and insert the message into the DB. 139 TransactionContext c = persistenceAdapter.getTransactionContext(context); 140 try { 141 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 142 } catch (SQLException e) { 143 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 144 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 145 } finally { 146 c.close(); 147 } 148 } 149 150 public Message getMessage(MessageId messageId) throws IOException { 151 // Get a connection and pull the message out of the DB 152 TransactionContext c = persistenceAdapter.getTransactionContext(); 153 try { 154 byte data[] = adapter.doGetMessage(c, messageId); 155 if (data == null) { 156 return null; 157 } 158 159 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 160 return answer; 161 } catch (IOException e) { 162 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 163 } catch (SQLException e) { 164 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 165 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 166 } finally { 167 c.close(); 168 } 169 } 170 171 public String getMessageReference(MessageId messageId) throws IOException { 172 long id = messageId.getBrokerSequenceId(); 173 174 // Get a connection and pull the message out of the DB 175 TransactionContext c = persistenceAdapter.getTransactionContext(); 176 try { 177 return adapter.doGetMessageReference(c, id); 178 } catch (IOException e) { 179 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 180 } catch (SQLException e) { 181 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 182 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 183 } finally { 184 c.close(); 185 } 186 } 187 188 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 189 190 long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0]; 191 192 // Get a connection and remove the message from the DB 193 TransactionContext c = persistenceAdapter.getTransactionContext(context); 194 try { 195 adapter.doRemoveMessage(c, seq); 196 } catch (SQLException e) { 197 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 198 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 199 } finally { 200 c.close(); 201 } 202 } 203 204 public void recover(final MessageRecoveryListener listener) throws Exception { 205 206 // Get all the Message ids out of the database. 207 TransactionContext c = persistenceAdapter.getTransactionContext(); 208 try { 209 c = persistenceAdapter.getTransactionContext(); 210 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 211 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 212 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 213 msg.getMessageId().setBrokerSequenceId(sequenceId); 214 return listener.recoverMessage(msg); 215 } 216 217 public boolean recoverMessageReference(String reference) throws Exception { 218 return listener.recoverMessageReference(new MessageId(reference)); 219 } 220 }); 221 } catch (SQLException e) { 222 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 223 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 224 } finally { 225 c.close(); 226 } 227 } 228 229 /** 230 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 231 */ 232 public void removeAllMessages(ConnectionContext context) throws IOException { 233 // Get a connection and remove the message from the DB 234 TransactionContext c = persistenceAdapter.getTransactionContext(context); 235 try { 236 adapter.doRemoveAllMessages(c, destination); 237 } catch (SQLException e) { 238 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 239 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 240 } finally { 241 c.close(); 242 } 243 } 244 245 public int getMessageCount() throws IOException { 246 int result = 0; 247 TransactionContext c = persistenceAdapter.getTransactionContext(); 248 try { 249 250 result = adapter.doGetMessageCount(c, destination); 251 252 } catch (SQLException e) { 253 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 254 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 255 } finally { 256 c.close(); 257 } 258 return result; 259 } 260 261 /** 262 * @param maxReturned 263 * @param listener 264 * @throws Exception 265 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 266 * org.apache.activemq.store.MessageRecoveryListener) 267 */ 268 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 269 TransactionContext c = persistenceAdapter.getTransactionContext(); 270 try { 271 adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), 272 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { 273 274 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 275 if (listener.hasSpace()) { 276 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 277 msg.getMessageId().setBrokerSequenceId(sequenceId); 278 listener.recoverMessage(msg); 279 lastRecoveredSequenceId.set(sequenceId); 280 lastRecoveredPriority.set(msg.getPriority()); 281 return true; 282 } 283 return false; 284 } 285 286 public boolean recoverMessageReference(String reference) throws Exception { 287 if (listener.hasSpace()) { 288 listener.recoverMessageReference(new MessageId(reference)); 289 return true; 290 } 291 return false; 292 } 293 294 }); 295 } catch (SQLException e) { 296 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 297 } finally { 298 c.close(); 299 } 300 301 } 302 303 /** 304 * @see org.apache.activemq.store.MessageStore#resetBatching() 305 */ 306 public void resetBatching() { 307 if (LOG.isTraceEnabled()) { 308 LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); 309 } 310 lastRecoveredSequenceId.set(-1); 311 lastRecoveredPriority.set(Byte.MAX_VALUE - 1); 312 313 } 314 315 @Override 316 public void setBatch(MessageId messageId) { 317 try { 318 long[] storedValues = getStoreSequenceIdForMessageId(messageId); 319 lastRecoveredSequenceId.set(storedValues[0]); 320 lastRecoveredPriority.set(storedValues[1]); 321 } catch (IOException ignoredAsAlreadyLogged) { 322 lastRecoveredSequenceId.set(-1); 323 lastRecoveredPriority.set(Byte.MAX_VALUE -1); 324 } 325 if (LOG.isTraceEnabled()) { 326 LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() 327 + ", priority: " + lastRecoveredPriority.get()); 328 } 329 } 330 331 private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { 332 long[] result = new long[]{-1, Byte.MAX_VALUE -1}; 333 TransactionContext c = persistenceAdapter.getTransactionContext(); 334 try { 335 result = adapter.getStoreSequenceId(c, destination, messageId); 336 } catch (SQLException e) { 337 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 338 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 339 } finally { 340 c.close(); 341 } 342 return result; 343 } 344 345 public void setPrioritizedMessages(boolean prioritizedMessages) { 346 super.setPrioritizedMessages(prioritizedMessages); 347 } 348 }