001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.concurrent.atomic.AtomicLong;
022
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.store.AbstractMessageStore;
030import org.apache.activemq.store.MessageRecoveryListener;
031import org.apache.activemq.util.ByteSequence;
032import org.apache.activemq.util.ByteSequenceData;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.wireformat.WireFormat;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * 
040 */
041public class JDBCMessageStore extends AbstractMessageStore {
042
043    class Duration {
044        static final int LIMIT = 100;
045        final long start = System.currentTimeMillis();
046        final String name;
047
048        Duration(String name) {
049            this.name = name;
050        }
051        void end() {
052            end(null);
053        }
054        void end(Object o) {
055            long duration = System.currentTimeMillis() - start;
056
057            if (duration > LIMIT) {
058                System.err.println(name + " took a long time: " + duration + "ms " + o);
059            }
060        }
061    }
062    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063    protected final WireFormat wireFormat;
064    protected final JDBCAdapter adapter;
065    protected final JDBCPersistenceAdapter persistenceAdapter;
066    protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067    protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068
069    protected ActiveMQMessageAudit audit;
070    
071    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
072        super(destination);
073        this.persistenceAdapter = persistenceAdapter;
074        this.adapter = adapter;
075        this.wireFormat = wireFormat;
076        this.audit = audit;
077
078        if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
079            recordDestinationCreation(destination);
080        }
081    }
082
083    private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
084        TransactionContext c = persistenceAdapter.getTransactionContext();
085        try {
086            c = persistenceAdapter.getTransactionContext();
087            if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
088                adapter.doRecordDestination(c, destination);
089            }
090        } catch (SQLException e) {
091            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092            throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
093        } finally {
094            c.close();
095        }
096    }
097
098    public void addMessage(ConnectionContext context, Message message) throws IOException {
099        MessageId messageId = message.getMessageId();
100        if (audit != null && audit.isDuplicate(message)) {
101            if (LOG.isDebugEnabled()) {
102                LOG.debug(destination.getPhysicalName()
103                    + " ignoring duplicated (add) message, already stored: "
104                    + messageId);
105            }
106            return;
107        }
108        
109        long sequenceId = persistenceAdapter.getNextSequenceId();
110        
111        // Serialize the Message..
112        byte data[];
113        try {
114            ByteSequence packet = wireFormat.marshal(message);
115            data = ByteSequenceData.toByteArray(packet);
116        } catch (IOException e) {
117            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
118        }
119
120        // Get a connection and insert the message into the DB.
121        TransactionContext c = persistenceAdapter.getTransactionContext(context);
122        try {      
123            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
124                    this.isPrioritizedMessages() ? message.getPriority() : 0);
125        } catch (SQLException e) {
126            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
128        } finally {
129            c.close();
130        }
131        onAdd(messageId, sequenceId, message.getPriority());
132    }
133
134    protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
135    }
136
137    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
138        // Get a connection and insert the message into the DB.
139        TransactionContext c = persistenceAdapter.getTransactionContext(context);
140        try {
141            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
142        } catch (SQLException e) {
143            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
144            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
145        } finally {
146            c.close();
147        }
148    }
149
150    public Message getMessage(MessageId messageId) throws IOException {
151        // Get a connection and pull the message out of the DB
152        TransactionContext c = persistenceAdapter.getTransactionContext();
153        try {
154            byte data[] = adapter.doGetMessage(c, messageId);
155            if (data == null) {
156                return null;
157            }
158
159            Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
160            return answer;
161        } catch (IOException e) {
162            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163        } catch (SQLException e) {
164            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
165            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
166        } finally {
167            c.close();
168        }
169    }
170
171    public String getMessageReference(MessageId messageId) throws IOException {
172        long id = messageId.getBrokerSequenceId();
173
174        // Get a connection and pull the message out of the DB
175        TransactionContext c = persistenceAdapter.getTransactionContext();
176        try {
177            return adapter.doGetMessageReference(c, id);
178        } catch (IOException e) {
179            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
180        } catch (SQLException e) {
181            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
182            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
183        } finally {
184            c.close();
185        }
186    }
187
188    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
189        
190        long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
191
192        // Get a connection and remove the message from the DB
193        TransactionContext c = persistenceAdapter.getTransactionContext(context);
194        try {
195            adapter.doRemoveMessage(c, seq);
196        } catch (SQLException e) {
197            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
198            throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
199        } finally {
200            c.close();
201        }
202    }
203
204    public void recover(final MessageRecoveryListener listener) throws Exception {
205
206        // Get all the Message ids out of the database.
207        TransactionContext c = persistenceAdapter.getTransactionContext();
208        try {
209            c = persistenceAdapter.getTransactionContext();
210            adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
211                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
212                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
213                    msg.getMessageId().setBrokerSequenceId(sequenceId);
214                    return listener.recoverMessage(msg);
215                }
216
217                public boolean recoverMessageReference(String reference) throws Exception {
218                    return listener.recoverMessageReference(new MessageId(reference));
219                }
220            });
221        } catch (SQLException e) {
222            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223            throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
224        } finally {
225            c.close();
226        }
227    }
228
229    /**
230     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
231     */
232    public void removeAllMessages(ConnectionContext context) throws IOException {
233        // Get a connection and remove the message from the DB
234        TransactionContext c = persistenceAdapter.getTransactionContext(context);
235        try {
236            adapter.doRemoveAllMessages(c, destination);
237        } catch (SQLException e) {
238            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
239            throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
240        } finally {
241            c.close();
242        }
243    }
244
245    public int getMessageCount() throws IOException {
246        int result = 0;
247        TransactionContext c = persistenceAdapter.getTransactionContext();
248        try {
249
250            result = adapter.doGetMessageCount(c, destination);
251
252        } catch (SQLException e) {
253            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
254            throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
255        } finally {
256            c.close();
257        }
258        return result;
259    }
260
261    /**
262     * @param maxReturned
263     * @param listener
264     * @throws Exception
265     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
266     *      org.apache.activemq.store.MessageRecoveryListener)
267     */
268    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
269        TransactionContext c = persistenceAdapter.getTransactionContext();
270        try {
271            adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
272                    maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
273
274                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
275                    if (listener.hasSpace()) {
276                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
277                        msg.getMessageId().setBrokerSequenceId(sequenceId);
278                        listener.recoverMessage(msg);
279                        lastRecoveredSequenceId.set(sequenceId);
280                        lastRecoveredPriority.set(msg.getPriority());
281                        return true;
282                    }
283                    return false;
284                }
285
286                public boolean recoverMessageReference(String reference) throws Exception {
287                    if (listener.hasSpace()) {
288                        listener.recoverMessageReference(new MessageId(reference));
289                        return true;
290                    }
291                    return false;
292                }
293
294            });
295        } catch (SQLException e) {
296            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
297        } finally {
298            c.close();
299        }
300
301    }
302
303    /**
304     * @see org.apache.activemq.store.MessageStore#resetBatching()
305     */
306    public void resetBatching() {
307        if (LOG.isTraceEnabled()) {
308            LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
309        }
310        lastRecoveredSequenceId.set(-1);
311        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
312
313    }
314
315    @Override
316    public void setBatch(MessageId messageId) {
317        try {
318            long[] storedValues = getStoreSequenceIdForMessageId(messageId);
319            lastRecoveredSequenceId.set(storedValues[0]);
320            lastRecoveredPriority.set(storedValues[1]);
321        } catch (IOException ignoredAsAlreadyLogged) {
322            lastRecoveredSequenceId.set(-1);
323            lastRecoveredPriority.set(Byte.MAX_VALUE -1);
324        }
325        if (LOG.isTraceEnabled()) {
326            LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
327                    + ", priority: " + lastRecoveredPriority.get());
328        }
329    }
330
331    private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
332        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
333        TransactionContext c = persistenceAdapter.getTransactionContext();
334        try {
335            result = adapter.getStoreSequenceId(c, destination, messageId);
336        } catch (SQLException e) {
337            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
338            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
339        } finally {
340            c.close();
341        }
342        return result;
343    }
344    
345    public void setPrioritizedMessages(boolean prioritizedMessages) {
346        super.setPrioritizedMessages(prioritizedMessages);
347    }   
348}