001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.journal; 018 019 import java.io.File; 020 import java.io.IOException; 021 022 import org.apache.activeio.journal.Journal; 023 import org.apache.activeio.journal.active.JournalImpl; 024 import org.apache.activeio.journal.active.JournalLockedException; 025 import org.apache.activemq.store.PersistenceAdapter; 026 import org.apache.activemq.store.PersistenceAdapterFactory; 027 import org.apache.activemq.store.jdbc.DataSourceSupport; 028 import org.apache.activemq.store.jdbc.JDBCAdapter; 029 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 030 import org.apache.activemq.store.jdbc.Statements; 031 import org.apache.activemq.thread.TaskRunnerFactory; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Factory class that can create PersistenceAdapter objects. 037 * 038 * @org.apache.xbean.XBean 039 * 040 */ 041 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { 042 043 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 044 045 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class); 046 047 private int journalLogFileSize = 1024 * 1024 * 20; 048 private int journalLogFiles = 2; 049 private TaskRunnerFactory taskRunnerFactory; 050 private Journal journal; 051 private boolean useJournal = true; 052 private boolean useQuickJournal; 053 private File journalArchiveDirectory; 054 private boolean failIfJournalIsLocked; 055 private int journalThreadPriority = Thread.MAX_PRIORITY; 056 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); 057 private boolean useDedicatedTaskRunner; 058 059 public PersistenceAdapter createPersistenceAdapter() throws IOException { 060 jdbcPersistenceAdapter.setDataSource(getDataSource()); 061 062 if (!useJournal) { 063 return jdbcPersistenceAdapter; 064 } 065 JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); 066 result.setDirectory(getDataDirectoryFile()); 067 return result; 068 069 } 070 071 public int getJournalLogFiles() { 072 return journalLogFiles; 073 } 074 075 /** 076 * Sets the number of journal log files to use 077 */ 078 public void setJournalLogFiles(int journalLogFiles) { 079 this.journalLogFiles = journalLogFiles; 080 } 081 082 public int getJournalLogFileSize() { 083 return journalLogFileSize; 084 } 085 086 /** 087 * Sets the size of the journal log files 088 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 089 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 090 */ 091 public void setJournalLogFileSize(int journalLogFileSize) { 092 this.journalLogFileSize = journalLogFileSize; 093 } 094 095 public JDBCPersistenceAdapter getJdbcAdapter() { 096 return jdbcPersistenceAdapter; 097 } 098 099 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { 100 this.jdbcPersistenceAdapter = jdbcAdapter; 101 } 102 103 public boolean isUseJournal() { 104 return useJournal; 105 } 106 107 /** 108 * Enables or disables the use of the journal. The default is to use the 109 * journal 110 * 111 * @param useJournal 112 */ 113 public void setUseJournal(boolean useJournal) { 114 this.useJournal = useJournal; 115 } 116 117 public boolean isUseDedicatedTaskRunner() { 118 return useDedicatedTaskRunner; 119 } 120 121 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 122 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 123 } 124 125 public TaskRunnerFactory getTaskRunnerFactory() { 126 if (taskRunnerFactory == null) { 127 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, 128 true, 1000, isUseDedicatedTaskRunner()); 129 } 130 return taskRunnerFactory; 131 } 132 133 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 134 this.taskRunnerFactory = taskRunnerFactory; 135 } 136 137 public Journal getJournal() throws IOException { 138 if (journal == null) { 139 createJournal(); 140 } 141 return journal; 142 } 143 144 public void setJournal(Journal journal) { 145 this.journal = journal; 146 } 147 148 public File getJournalArchiveDirectory() { 149 if (journalArchiveDirectory == null && useQuickJournal) { 150 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); 151 } 152 return journalArchiveDirectory; 153 } 154 155 public void setJournalArchiveDirectory(File journalArchiveDirectory) { 156 this.journalArchiveDirectory = journalArchiveDirectory; 157 } 158 159 public boolean isUseQuickJournal() { 160 return useQuickJournal; 161 } 162 163 /** 164 * Enables or disables the use of quick journal, which keeps messages in the 165 * journal and just stores a reference to the messages in JDBC. Defaults to 166 * false so that messages actually reside long term in the JDBC database. 167 */ 168 public void setUseQuickJournal(boolean useQuickJournal) { 169 this.useQuickJournal = useQuickJournal; 170 } 171 172 public JDBCAdapter getAdapter() throws IOException { 173 return jdbcPersistenceAdapter.getAdapter(); 174 } 175 176 public void setAdapter(JDBCAdapter adapter) { 177 jdbcPersistenceAdapter.setAdapter(adapter); 178 } 179 180 public Statements getStatements() { 181 return jdbcPersistenceAdapter.getStatements(); 182 } 183 184 public void setStatements(Statements statements) { 185 jdbcPersistenceAdapter.setStatements(statements); 186 } 187 188 public boolean isUseDatabaseLock() { 189 return jdbcPersistenceAdapter.isUseDatabaseLock(); 190 } 191 192 /** 193 * Sets whether or not an exclusive database lock should be used to enable 194 * JDBC Master/Slave. Enabled by default. 195 */ 196 public void setUseDatabaseLock(boolean useDatabaseLock) { 197 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); 198 } 199 200 public boolean isCreateTablesOnStartup() { 201 return jdbcPersistenceAdapter.isCreateTablesOnStartup(); 202 } 203 204 /** 205 * Sets whether or not tables are created on startup 206 */ 207 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 208 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); 209 } 210 211 public int getJournalThreadPriority() { 212 return journalThreadPriority; 213 } 214 215 /** 216 * Sets the thread priority of the journal thread 217 */ 218 public void setJournalThreadPriority(int journalThreadPriority) { 219 this.journalThreadPriority = journalThreadPriority; 220 } 221 222 /** 223 * @throws IOException 224 */ 225 protected void createJournal() throws IOException { 226 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); 227 if (failIfJournalIsLocked) { 228 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 229 getJournalArchiveDirectory()); 230 } else { 231 while (true) { 232 try { 233 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 234 getJournalArchiveDirectory()); 235 break; 236 } catch (JournalLockedException e) { 237 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) 238 + " seconds for the journal to be unlocked."); 239 try { 240 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 241 } catch (InterruptedException e1) { 242 } 243 } 244 } 245 } 246 } 247 248 }