001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    
022    import org.apache.activeio.journal.Journal;
023    import org.apache.activeio.journal.active.JournalImpl;
024    import org.apache.activeio.journal.active.JournalLockedException;
025    import org.apache.activemq.store.PersistenceAdapter;
026    import org.apache.activemq.store.PersistenceAdapterFactory;
027    import org.apache.activemq.store.jdbc.DataSourceSupport;
028    import org.apache.activemq.store.jdbc.JDBCAdapter;
029    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
030    import org.apache.activemq.store.jdbc.Statements;
031    import org.apache.activemq.thread.TaskRunnerFactory;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Factory class that can create PersistenceAdapter objects.
037     * 
038     * @org.apache.xbean.XBean
039     * 
040     */
041    public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
042    
043        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
044    
045        private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
046    
047        private int journalLogFileSize = 1024 * 1024 * 20;
048        private int journalLogFiles = 2;
049        private TaskRunnerFactory taskRunnerFactory;
050        private Journal journal;
051        private boolean useJournal = true;
052        private boolean useQuickJournal;
053        private File journalArchiveDirectory;
054        private boolean failIfJournalIsLocked;
055        private int journalThreadPriority = Thread.MAX_PRIORITY;
056        private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
057        private boolean useDedicatedTaskRunner;
058    
059        public PersistenceAdapter createPersistenceAdapter() throws IOException {
060            jdbcPersistenceAdapter.setDataSource(getDataSource());
061    
062            if (!useJournal) {
063                return jdbcPersistenceAdapter;
064            }
065            JournalPersistenceAdapter result =  new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
066            result.setDirectory(getDataDirectoryFile());
067            return result;
068    
069        }
070    
071        public int getJournalLogFiles() {
072            return journalLogFiles;
073        }
074    
075        /**
076         * Sets the number of journal log files to use
077         */
078        public void setJournalLogFiles(int journalLogFiles) {
079            this.journalLogFiles = journalLogFiles;
080        }
081    
082        public int getJournalLogFileSize() {
083            return journalLogFileSize;
084        }
085    
086        /**
087         * Sets the size of the journal log files
088         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
089         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
090         */
091        public void setJournalLogFileSize(int journalLogFileSize) {
092            this.journalLogFileSize = journalLogFileSize;
093        }
094    
095        public JDBCPersistenceAdapter getJdbcAdapter() {
096            return jdbcPersistenceAdapter;
097        }
098    
099        public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
100            this.jdbcPersistenceAdapter = jdbcAdapter;
101        }
102    
103        public boolean isUseJournal() {
104            return useJournal;
105        }
106    
107        /**
108         * Enables or disables the use of the journal. The default is to use the
109         * journal
110         * 
111         * @param useJournal
112         */
113        public void setUseJournal(boolean useJournal) {
114            this.useJournal = useJournal;
115        }
116    
117        public boolean isUseDedicatedTaskRunner() {
118            return useDedicatedTaskRunner;
119        }
120        
121        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
122            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
123        }
124        
125        public TaskRunnerFactory getTaskRunnerFactory() {
126            if (taskRunnerFactory == null) {
127                taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
128                                                          true, 1000, isUseDedicatedTaskRunner());
129            }
130            return taskRunnerFactory;
131        }
132    
133        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
134            this.taskRunnerFactory = taskRunnerFactory;
135        }
136    
137        public Journal getJournal() throws IOException {
138            if (journal == null) {
139                createJournal();
140            }
141            return journal;
142        }
143    
144        public void setJournal(Journal journal) {
145            this.journal = journal;
146        }
147    
148        public File getJournalArchiveDirectory() {
149            if (journalArchiveDirectory == null && useQuickJournal) {
150                journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
151            }
152            return journalArchiveDirectory;
153        }
154    
155        public void setJournalArchiveDirectory(File journalArchiveDirectory) {
156            this.journalArchiveDirectory = journalArchiveDirectory;
157        }
158    
159        public boolean isUseQuickJournal() {
160            return useQuickJournal;
161        }
162    
163        /**
164         * Enables or disables the use of quick journal, which keeps messages in the
165         * journal and just stores a reference to the messages in JDBC. Defaults to
166         * false so that messages actually reside long term in the JDBC database.
167         */
168        public void setUseQuickJournal(boolean useQuickJournal) {
169            this.useQuickJournal = useQuickJournal;
170        }
171    
172        public JDBCAdapter getAdapter() throws IOException {
173            return jdbcPersistenceAdapter.getAdapter();
174        }
175    
176        public void setAdapter(JDBCAdapter adapter) {
177            jdbcPersistenceAdapter.setAdapter(adapter);
178        }
179    
180        public Statements getStatements() {
181            return jdbcPersistenceAdapter.getStatements();
182        }
183    
184        public void setStatements(Statements statements) {
185            jdbcPersistenceAdapter.setStatements(statements);
186        }
187    
188        public boolean isUseDatabaseLock() {
189            return jdbcPersistenceAdapter.isUseDatabaseLock();
190        }
191    
192        /**
193         * Sets whether or not an exclusive database lock should be used to enable
194         * JDBC Master/Slave. Enabled by default.
195         */
196        public void setUseDatabaseLock(boolean useDatabaseLock) {
197            jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
198        }
199    
200        public boolean isCreateTablesOnStartup() {
201            return jdbcPersistenceAdapter.isCreateTablesOnStartup();
202        }
203    
204        /**
205         * Sets whether or not tables are created on startup
206         */
207        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
208            jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
209        }
210    
211        public int getJournalThreadPriority() {
212            return journalThreadPriority;
213        }
214    
215        /**
216         * Sets the thread priority of the journal thread
217         */
218        public void setJournalThreadPriority(int journalThreadPriority) {
219            this.journalThreadPriority = journalThreadPriority;
220        }
221    
222        /**
223         * @throws IOException
224         */
225        protected void createJournal() throws IOException {
226            File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
227            if (failIfJournalIsLocked) {
228                journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
229                                          getJournalArchiveDirectory());
230            } else {
231                while (true) {
232                    try {
233                        journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
234                                                  getJournalArchiveDirectory());
235                        break;
236                    } catch (JournalLockedException e) {
237                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
238                                 + " seconds for the journal to be unlocked.");
239                        try {
240                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
241                        } catch (InterruptedException e1) {
242                        }
243                    }
244                }
245            }
246        }
247    
248    }