001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.FileFilter; 021import java.io.IOException; 022import java.nio.charset.Charset; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.BrokerServiceAware; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQQueue; 034import org.apache.activemq.command.ActiveMQTopic; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.ProducerId; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.filter.AnyDestination; 040import org.apache.activemq.filter.DestinationMap; 041import org.apache.activemq.protobuf.Buffer; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.store.TransactionStore; 046import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 047import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 048import org.apache.activemq.usage.SystemUsage; 049import org.apache.activemq.util.IOHelper; 050import org.apache.activemq.util.IntrospectionSupport; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports 056 * distribution of destinations across multiple kahaDB persistence adapters 057 * 058 * @org.apache.xbean.XBean element="mKahaDB" 059 */ 060public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware { 061 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class); 062 063 final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}); 064 final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616")); 065 066 BrokerService brokerService; 067 List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>(); 068 private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB"); 069 070 MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this); 071 072 // all local store transactions are XA, 2pc if more than one adapter involved 073 TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() { 074 @Override 075 public KahaTransactionInfo transform(TransactionId txid) { 076 if (txid == null) { 077 return null; 078 } 079 KahaTransactionInfo rc = new KahaTransactionInfo(); 080 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 081 if (txid.isLocalTransaction()) { 082 LocalTransactionId t = (LocalTransactionId) txid; 083 kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8")))); 084 kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8")))); 085 kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC); 086 } else { 087 XATransactionId t = (XATransactionId) txid; 088 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 089 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 090 kahaTxId.setFormatId(t.getFormatId()); 091 } 092 rc.setXaTransacitonId(kahaTxId); 093 return rc; 094 } 095 }; 096 097 /** 098 * Sets the FilteredKahaDBPersistenceAdapter entries 099 * 100 * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter" 101 */ 102 @SuppressWarnings({ "rawtypes", "unchecked" }) 103 public void setFilteredPersistenceAdapters(List entries) { 104 for (Object entry : entries) { 105 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry; 106 KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter(); 107 if (filteredAdapter.getDestination() == null) { 108 filteredAdapter.setDestination(matchAll); 109 } 110 111 if (filteredAdapter.isPerDestination()) { 112 configureDirectory(adapter, null); 113 // per destination adapters will be created on demand or during recovery 114 continue; 115 } else { 116 configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination())); 117 } 118 119 configureAdapter(adapter); 120 adapters.add(adapter); 121 } 122 super.setEntries(entries); 123 } 124 125 private String nameFromDestinationFilter(ActiveMQDestination destination) { 126 return IOHelper.toFileSystemSafeName(destination.getQualifiedName()); 127 } 128 129 public boolean isLocalXid(TransactionId xid) { 130 return xid instanceof XATransactionId && 131 ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC; 132 } 133 134 public void beginTransaction(ConnectionContext context) throws IOException { 135 throw new IllegalStateException(); 136 } 137 138 public void checkpoint(final boolean sync) throws IOException { 139 for (PersistenceAdapter persistenceAdapter : adapters) { 140 persistenceAdapter.checkpoint(sync); 141 } 142 } 143 144 public void commitTransaction(ConnectionContext context) throws IOException { 145 throw new IllegalStateException(); 146 } 147 148 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 149 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination); 150 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination)); 151 } 152 153 private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) { 154 Object result = this.chooseValue(destination); 155 if (result == null) { 156 throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters); 157 } 158 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result; 159 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) { 160 result = addAdapter(filteredAdapter, destination); 161 startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName()); 162 if (LOG.isTraceEnabled()) { 163 LOG.info("created per destination adapter for: " + destination + ", " + result); 164 } 165 } 166 return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(); 167 } 168 169 private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) { 170 try { 171 kahaDBPersistenceAdapter.start(); 172 } catch (Exception e) { 173 RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e); 174 LOG.error(detail.toString(), e); 175 throw detail; 176 } 177 } 178 179 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 180 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination); 181 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination)); 182 } 183 184 public TransactionStore createTransactionStore() throws IOException { 185 return transactionStore; 186 } 187 188 public void deleteAllMessages() throws IOException { 189 for (PersistenceAdapter persistenceAdapter : adapters) { 190 persistenceAdapter.deleteAllMessages(); 191 } 192 transactionStore.deleteAllMessages(); 193 IOHelper.deleteChildren(getDirectory()); 194 } 195 196 public Set<ActiveMQDestination> getDestinations() { 197 Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>(); 198 for (PersistenceAdapter persistenceAdapter : adapters) { 199 results.addAll(persistenceAdapter.getDestinations()); 200 } 201 return results; 202 } 203 204 public long getLastMessageBrokerSequenceId() throws IOException { 205 long maxId = -1; 206 for (PersistenceAdapter persistenceAdapter : adapters) { 207 maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId()); 208 } 209 return maxId; 210 } 211 212 public long getLastProducerSequenceId(ProducerId id) throws IOException { 213 long maxId = -1; 214 for (PersistenceAdapter persistenceAdapter : adapters) { 215 maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id)); 216 } 217 return maxId; 218 } 219 220 public void removeQueueMessageStore(ActiveMQQueue destination) { 221 getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination); 222 } 223 224 public void removeTopicMessageStore(ActiveMQTopic destination) { 225 getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination); 226 } 227 228 public void rollbackTransaction(ConnectionContext context) throws IOException { 229 throw new IllegalStateException(); 230 } 231 232 public void setBrokerName(String brokerName) { 233 for (PersistenceAdapter persistenceAdapter : adapters) { 234 persistenceAdapter.setBrokerName(brokerName); 235 } 236 } 237 238 public void setUsageManager(SystemUsage usageManager) { 239 for (PersistenceAdapter persistenceAdapter : adapters) { 240 persistenceAdapter.setUsageManager(usageManager); 241 } 242 } 243 244 public long size() { 245 long size = 0; 246 for (PersistenceAdapter persistenceAdapter : adapters) { 247 size += persistenceAdapter.size(); 248 } 249 return size; 250 } 251 252 public void start() throws Exception { 253 Object result = this.chooseValue(matchAll); 254 if (result != null) { 255 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result; 256 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) { 257 findAndRegisterExistingAdapters(filteredAdapter); 258 } 259 } 260 for (PersistenceAdapter persistenceAdapter : adapters) { 261 persistenceAdapter.start(); 262 } 263 } 264 265 private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) { 266 FileFilter destinationNames = new FileFilter() { 267 @Override 268 public boolean accept(File file) { 269 return file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); 270 } 271 }; 272 File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); 273 if (candidates != null) { 274 for (File candidate : candidates) { 275 registerExistingAdapter(template, candidate); 276 } 277 } 278 } 279 280 private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) { 281 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName()); 282 startAdapter(adapter, candidate.getName()); 283 registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]); 284 } 285 286 private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) { 287 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination)); 288 return registerAdapter(adapter, destination); 289 } 290 291 private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) { 292 KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template); 293 configureAdapter(adapter); 294 configureDirectory(adapter, destinationName); 295 return adapter; 296 } 297 298 private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) { 299 File directory = null; 300 if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) { 301 // not set so inherit from mkahadb 302 directory = getDirectory(); 303 } else { 304 directory = adapter.getDirectory(); 305 } 306 if (fileName != null) { 307 directory = new File(directory, fileName); 308 } 309 adapter.setDirectory(directory); 310 } 311 312 private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) { 313 adapters.add(adapter); 314 FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter); 315 put(destination, result); 316 return result; 317 } 318 319 private void configureAdapter(KahaDBPersistenceAdapter adapter) { 320 // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans 321 adapter.getStore().setTransactionIdTransformer(transactionIdTransformer); 322 adapter.setBrokerService(getBrokerService()); 323 } 324 325 private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) { 326 Map<String, Object> configuration = new HashMap<String, Object>(); 327 IntrospectionSupport.getProperties(template, configuration, null); 328 KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); 329 IntrospectionSupport.setProperties(adapter, configuration); 330 return adapter; 331 } 332 333 public void stop() throws Exception { 334 for (PersistenceAdapter persistenceAdapter : adapters) { 335 persistenceAdapter.stop(); 336 } 337 } 338 339 public File getDirectory() { 340 return this.directory; 341 } 342 343 @Override 344 public void setDirectory(File directory) { 345 this.directory = directory; 346 } 347 348 public void setBrokerService(BrokerService brokerService) { 349 for (KahaDBPersistenceAdapter persistenceAdapter : adapters) { 350 persistenceAdapter.setBrokerService(brokerService); 351 } 352 this.brokerService = brokerService; 353 } 354 355 public BrokerService getBrokerService() { 356 return brokerService; 357 } 358 359 public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) { 360 this.transactionStore = transactionStore; 361 } 362 363 /** 364 * Set the max file length of the transaction journal 365 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 366 * be used 367 * 368 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 369 */ 370 public void setJournalMaxFileLength(int maxFileLength) { 371 transactionStore.setJournalMaxFileLength(maxFileLength); 372 } 373 374 public int getJournalMaxFileLength() { 375 return transactionStore.getJournalMaxFileLength(); 376 } 377 378 /** 379 * Set the max write batch size of the transaction journal 380 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 381 * be used 382 * 383 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 384 */ 385 public void setJournalWriteBatchSize(int journalWriteBatchSize) { 386 transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize); 387 } 388 389 public int getJournalWriteBatchSize() { 390 return transactionStore.getJournalMaxWriteBatchSize(); 391 } 392 393 @Override 394 public String toString() { 395 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 396 return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters; 397 } 398 399}