001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.store.AbstractMessageStore;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.util.ByteSequence;
032    import org.apache.activemq.util.ByteSequenceData;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * 
040     */
041    public class JDBCMessageStore extends AbstractMessageStore {
042    
043        class Duration {
044            static final int LIMIT = 100;
045            final long start = System.currentTimeMillis();
046            final String name;
047    
048            Duration(String name) {
049                this.name = name;
050            }
051            void end() {
052                end(null);
053            }
054            void end(Object o) {
055                long duration = System.currentTimeMillis() - start;
056    
057                if (duration > LIMIT) {
058                    System.err.println(name + " took a long time: " + duration + "ms " + o);
059                }
060            }
061        }
062        private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063        protected final WireFormat wireFormat;
064        protected final JDBCAdapter adapter;
065        protected final JDBCPersistenceAdapter persistenceAdapter;
066        protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067        protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068    
069        protected ActiveMQMessageAudit audit;
070        
071        public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
072            super(destination);
073            this.persistenceAdapter = persistenceAdapter;
074            this.adapter = adapter;
075            this.wireFormat = wireFormat;
076            this.audit = audit;
077    
078            if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
079                recordDestinationCreation(destination);
080            }
081        }
082    
083        private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
084            TransactionContext c = persistenceAdapter.getTransactionContext();
085            try {
086                c = persistenceAdapter.getTransactionContext();
087                if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
088                    adapter.doRecordDestination(c, destination);
089                }
090            } catch (SQLException e) {
091                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092                throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
093            } finally {
094                c.close();
095            }
096        }
097    
098        public void addMessage(ConnectionContext context, Message message) throws IOException {
099            MessageId messageId = message.getMessageId();
100            if (audit != null && audit.isDuplicate(message)) {
101                if (LOG.isDebugEnabled()) {
102                    LOG.debug(destination.getPhysicalName()
103                        + " ignoring duplicated (add) message, already stored: "
104                        + messageId);
105                }
106                return;
107            }
108            
109            long sequenceId = persistenceAdapter.getNextSequenceId();
110            
111            // Serialize the Message..
112            byte data[];
113            try {
114                ByteSequence packet = wireFormat.marshal(message);
115                data = ByteSequenceData.toByteArray(packet);
116            } catch (IOException e) {
117                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
118            }
119    
120            // Get a connection and insert the message into the DB.
121            TransactionContext c = persistenceAdapter.getTransactionContext(context);
122            try {      
123                adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
124                        this.isPrioritizedMessages() ? message.getPriority() : 0);
125            } catch (SQLException e) {
126                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
128            } finally {
129                c.close();
130            }
131            onAdd(messageId, sequenceId, message.getPriority());
132        }
133    
134        protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
135        }
136    
137        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
138            // Get a connection and insert the message into the DB.
139            TransactionContext c = persistenceAdapter.getTransactionContext(context);
140            try {
141                adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
142            } catch (SQLException e) {
143                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
144                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
145            } finally {
146                c.close();
147            }
148        }
149    
150        public Message getMessage(MessageId messageId) throws IOException {
151            // Get a connection and pull the message out of the DB
152            TransactionContext c = persistenceAdapter.getTransactionContext();
153            try {
154                byte data[] = adapter.doGetMessage(c, messageId);
155                if (data == null) {
156                    return null;
157                }
158    
159                Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
160                return answer;
161            } catch (IOException e) {
162                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163            } catch (SQLException e) {
164                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
165                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
166            } finally {
167                c.close();
168            }
169        }
170    
171        public String getMessageReference(MessageId messageId) throws IOException {
172            long id = messageId.getBrokerSequenceId();
173    
174            // Get a connection and pull the message out of the DB
175            TransactionContext c = persistenceAdapter.getTransactionContext();
176            try {
177                return adapter.doGetMessageReference(c, id);
178            } catch (IOException e) {
179                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
180            } catch (SQLException e) {
181                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
182                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
183            } finally {
184                c.close();
185            }
186        }
187    
188        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
189            
190            long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
191    
192            // Get a connection and remove the message from the DB
193            TransactionContext c = persistenceAdapter.getTransactionContext(context);
194            try {
195                adapter.doRemoveMessage(c, seq);
196            } catch (SQLException e) {
197                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
198                throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
199            } finally {
200                c.close();
201            }
202        }
203    
204        public void recover(final MessageRecoveryListener listener) throws Exception {
205    
206            // Get all the Message ids out of the database.
207            TransactionContext c = persistenceAdapter.getTransactionContext();
208            try {
209                c = persistenceAdapter.getTransactionContext();
210                adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
211                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
212                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
213                        msg.getMessageId().setBrokerSequenceId(sequenceId);
214                        return listener.recoverMessage(msg);
215                    }
216    
217                    public boolean recoverMessageReference(String reference) throws Exception {
218                        return listener.recoverMessageReference(new MessageId(reference));
219                    }
220                });
221            } catch (SQLException e) {
222                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
223                throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
224            } finally {
225                c.close();
226            }
227        }
228    
229        /**
230         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
231         */
232        public void removeAllMessages(ConnectionContext context) throws IOException {
233            // Get a connection and remove the message from the DB
234            TransactionContext c = persistenceAdapter.getTransactionContext(context);
235            try {
236                adapter.doRemoveAllMessages(c, destination);
237            } catch (SQLException e) {
238                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
239                throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
240            } finally {
241                c.close();
242            }
243        }
244    
245        public int getMessageCount() throws IOException {
246            int result = 0;
247            TransactionContext c = persistenceAdapter.getTransactionContext();
248            try {
249    
250                result = adapter.doGetMessageCount(c, destination);
251    
252            } catch (SQLException e) {
253                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
254                throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
255            } finally {
256                c.close();
257            }
258            return result;
259        }
260    
261        /**
262         * @param maxReturned
263         * @param listener
264         * @throws Exception
265         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
266         *      org.apache.activemq.store.MessageRecoveryListener)
267         */
268        public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
269            TransactionContext c = persistenceAdapter.getTransactionContext();
270            try {
271                adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
272                        maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
273    
274                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
275                        if (listener.hasSpace()) {
276                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
277                            msg.getMessageId().setBrokerSequenceId(sequenceId);
278                            listener.recoverMessage(msg);
279                            lastRecoveredSequenceId.set(sequenceId);
280                            lastRecoveredPriority.set(msg.getPriority());
281                            return true;
282                        }
283                        return false;
284                    }
285    
286                    public boolean recoverMessageReference(String reference) throws Exception {
287                        if (listener.hasSpace()) {
288                            listener.recoverMessageReference(new MessageId(reference));
289                            return true;
290                        }
291                        return false;
292                    }
293    
294                });
295            } catch (SQLException e) {
296                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
297            } finally {
298                c.close();
299            }
300    
301        }
302    
303        /**
304         * @see org.apache.activemq.store.MessageStore#resetBatching()
305         */
306        public void resetBatching() {
307            if (LOG.isTraceEnabled()) {
308                LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
309            }
310            lastRecoveredSequenceId.set(-1);
311            lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
312    
313        }
314    
315        @Override
316        public void setBatch(MessageId messageId) {
317            try {
318                long[] storedValues = getStoreSequenceIdForMessageId(messageId);
319                lastRecoveredSequenceId.set(storedValues[0]);
320                lastRecoveredPriority.set(storedValues[1]);
321            } catch (IOException ignoredAsAlreadyLogged) {
322                lastRecoveredSequenceId.set(-1);
323                lastRecoveredPriority.set(Byte.MAX_VALUE -1);
324            }
325            if (LOG.isTraceEnabled()) {
326                LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
327                        + ", priority: " + lastRecoveredPriority.get());
328            }
329        }
330    
331        private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
332            long[] result = new long[]{-1, Byte.MAX_VALUE -1};
333            TransactionContext c = persistenceAdapter.getTransactionContext();
334            try {
335                result = adapter.getStoreSequenceId(c, destination, messageId);
336            } catch (SQLException e) {
337                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
338                throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
339            } finally {
340                c.close();
341            }
342            return result;
343        }
344        
345        public void setPrioritizedMessages(boolean prioritizedMessages) {
346            super.setPrioritizedMessages(prioritizedMessages);
347        }   
348    }