001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.util.HashMap; 020 import java.util.HashSet; 021 import java.util.Map; 022 import java.util.Set; 023 import java.util.Timer; 024 import java.util.TimerTask; 025 026 import org.apache.activemq.broker.ConnectionContext; 027 import org.apache.activemq.command.ActiveMQDestination; 028 import org.apache.activemq.thread.TaskRunnerFactory; 029 import org.apache.activemq.usage.SystemUsage; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 /** 034 * 035 */ 036 public abstract class AbstractTempRegion extends AbstractRegion { 037 private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class); 038 039 private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>(); 040 private final boolean doCacheTempDestinations; 041 private final int purgeTime; 042 private Timer purgeTimer; 043 private TimerTask purgeTask; 044 045 046 /** 047 * @param broker 048 * @param destinationStatistics 049 * @param memoryManager 050 * @param taskRunnerFactory 051 * @param destinationFactory 052 */ 053 public AbstractTempRegion(RegionBroker broker, 054 DestinationStatistics destinationStatistics, 055 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 056 DestinationFactory destinationFactory) { 057 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, 058 destinationFactory); 059 this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations(); 060 this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations(); 061 if (this.doCacheTempDestinations) { 062 this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true); 063 this.purgeTask = new TimerTask() { 064 public void run() { 065 doPurge(); 066 } 067 068 }; 069 this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime); 070 } 071 072 } 073 074 public void stop() throws Exception { 075 super.stop(); 076 if (purgeTimer != null) { 077 purgeTimer.cancel(); 078 } 079 } 080 081 protected abstract Destination doCreateDestination( 082 ConnectionContext context, ActiveMQDestination destination) 083 throws Exception; 084 085 protected synchronized Destination createDestination( 086 ConnectionContext context, ActiveMQDestination destination) 087 throws Exception { 088 Destination result = cachedDestinations.remove(new CachedDestination( 089 destination)); 090 if (result == null) { 091 result = doCreateDestination(context, destination); 092 } 093 return result; 094 } 095 096 protected final synchronized void dispose(ConnectionContext context, 097 Destination dest) throws Exception { 098 // add to cache 099 if (this.doCacheTempDestinations) { 100 cachedDestinations.put(new CachedDestination(dest 101 .getActiveMQDestination()), dest); 102 }else { 103 try { 104 dest.dispose(context); 105 dest.stop(); 106 } catch (Exception e) { 107 LOG.warn("Failed to dispose of " + dest, e); 108 } 109 } 110 } 111 112 private void doDispose(Destination dest) { 113 ConnectionContext context = new ConnectionContext(); 114 try { 115 dest.dispose(context); 116 dest.stop(); 117 } catch (Exception e) { 118 LOG.warn("Failed to dispose of " + dest, e); 119 } 120 121 } 122 123 private synchronized void doPurge() { 124 long currentTime = System.currentTimeMillis(); 125 if (cachedDestinations.size() > 0) { 126 Set<CachedDestination> tmp = new HashSet<CachedDestination>( 127 cachedDestinations.keySet()); 128 for (CachedDestination key : tmp) { 129 if ((key.timeStamp + purgeTime) < currentTime) { 130 Destination dest = cachedDestinations.remove(key); 131 if (dest != null) { 132 doDispose(dest); 133 } 134 } 135 } 136 } 137 } 138 139 static class CachedDestination { 140 long timeStamp; 141 142 ActiveMQDestination destination; 143 144 CachedDestination(ActiveMQDestination destination) { 145 this.destination = destination; 146 this.timeStamp = System.currentTimeMillis(); 147 } 148 149 public int hashCode() { 150 return destination.hashCode(); 151 } 152 153 public boolean equals(Object o) { 154 if (o instanceof CachedDestination) { 155 CachedDestination other = (CachedDestination) o; 156 return other.destination.equals(this.destination); 157 } 158 return false; 159 } 160 161 } 162 163 }