001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.usage;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.activemq.Service;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Used to keep track of how much of something is being used so that a
032 * productive working set usage can be controlled. Main use case is manage
033 * memory usage.
034 * 
035 * @org.apache.xbean.XBean
036 * 
037 */
038public abstract class Usage<T extends Usage> implements Service {
039
040    private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
041    protected final Object usageMutex = new Object();
042    protected int percentUsage;
043    protected T parent;
044    private UsageCapacity limiter = new DefaultUsageCapacity();
045    private int percentUsageMinDelta = 1;
046    private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
047    private final boolean debug = LOG.isDebugEnabled();
048    protected String name;
049    private float usagePortion = 1.0f;
050    private final List<T> children = new CopyOnWriteArrayList<T>();
051    private final List<Runnable> callbacks = new LinkedList<Runnable>();
052    private int pollingTime = 100;
053    private final AtomicBoolean started=new AtomicBoolean();
054    private ThreadPoolExecutor executor;
055    public Usage(T parent, String name, float portion) {
056        this.parent = parent;
057        this.usagePortion = portion;
058        if (parent != null) {
059            this.limiter.setLimit((long)(parent.getLimit() * portion));
060            name = parent.name + ":" + name;
061        }
062        this.name = name;
063    }
064
065    protected abstract long retrieveUsage();
066
067    /**
068     * @throws InterruptedException
069     */
070    public void waitForSpace() throws InterruptedException {
071        waitForSpace(0);
072    }
073
074    public boolean waitForSpace(long timeout) throws InterruptedException {
075        return waitForSpace(timeout, 100);
076    }
077    
078    /**
079     * @param timeout
080     * @throws InterruptedException
081     * @return true if space
082     */
083    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
084        if (parent != null) {
085            if (!parent.waitForSpace(timeout, highWaterMark)) {
086                return false;
087            }
088        }
089        synchronized (usageMutex) {
090            percentUsage=caclPercentUsage();
091            if (percentUsage >= highWaterMark) {
092                long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
093                long timeleft = deadline;
094                while (timeleft > 0) {
095                    percentUsage=caclPercentUsage();
096                    if (percentUsage >= highWaterMark) {
097                        usageMutex.wait(pollingTime);
098                        timeleft = deadline - System.currentTimeMillis();
099                    } else {
100                        break;
101                    }
102                }
103            }
104            return percentUsage < highWaterMark;
105        }
106    }
107
108    public boolean isFull() {
109        return isFull(100);
110    }
111    
112    public boolean isFull(int highWaterMark) {
113        if (parent != null && parent.isFull(highWaterMark)) {
114            return true;
115        }
116        synchronized (usageMutex) {
117            percentUsage=caclPercentUsage();
118            return percentUsage >= highWaterMark;
119        }
120    }
121
122    public void addUsageListener(UsageListener listener) {
123        listeners.add(listener);
124    }
125
126    public void removeUsageListener(UsageListener listener) {
127        listeners.remove(listener);
128    }
129
130    public long getLimit() {
131        synchronized (usageMutex) {
132            return limiter.getLimit();
133        }
134    }
135
136    /**
137     * Sets the memory limit in bytes. Setting the limit in bytes will set the
138     * usagePortion to 0 since the UsageManager is not going to be portion based
139     * off the parent.
140     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
141     * 
142     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
143     */
144    public void setLimit(long limit) {
145        if (percentUsageMinDelta < 0) {
146            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
147        }
148        synchronized (usageMutex) {
149            this.limiter.setLimit(limit);
150            this.usagePortion = 0;
151        }
152        onLimitChange();
153    }
154
155    protected void onLimitChange() {
156        // We may need to calculate the limit
157        if (usagePortion > 0 && parent != null) {
158            synchronized (usageMutex) {
159                this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
160            }
161        }
162        // Reset the percent currently being used.
163        int percentUsage;
164        synchronized (usageMutex) {
165            percentUsage = caclPercentUsage();
166        }
167        setPercentUsage(percentUsage);
168        // Let the children know that the limit has changed. They may need to
169        // set
170        // their limits based on ours.
171        for (T child : children) {
172            child.onLimitChange();
173        }
174    }
175
176    public float getUsagePortion() {
177        synchronized (usageMutex) {
178            return usagePortion;
179        }
180    }
181
182    public void setUsagePortion(float usagePortion) {
183        synchronized (usageMutex) {
184            this.usagePortion = usagePortion;
185        }
186        onLimitChange();
187    }
188
189    public int getPercentUsage() {
190        synchronized (usageMutex) {
191            return percentUsage;
192        }
193    }
194
195    public int getPercentUsageMinDelta() {
196        synchronized (usageMutex) {
197            return percentUsageMinDelta;
198        }
199    }
200
201    /**
202     * Sets the minimum number of percentage points the usage has to change
203     * before a UsageListener event is fired by the manager.
204     * 
205     * @param percentUsageMinDelta
206     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
207     */
208    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
209        if (percentUsageMinDelta < 1) {
210            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
211        }
212        int percentUsage;
213        synchronized (usageMutex) {
214            this.percentUsageMinDelta = percentUsageMinDelta;
215            percentUsage = caclPercentUsage();
216        }
217        setPercentUsage(percentUsage);
218    }
219
220    public long getUsage() {
221        synchronized (usageMutex) {
222            return retrieveUsage();
223        }
224    }
225
226    protected void setPercentUsage(int value) {
227        synchronized (usageMutex) {
228            int oldValue = percentUsage;
229            percentUsage = value;
230            if (oldValue != value) {
231                fireEvent(oldValue, value);
232            }
233        }
234    }
235
236    protected int caclPercentUsage() {
237        if (limiter.getLimit() == 0) {
238            return 0;
239        }
240        return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
241    }
242
243    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
244        if (debug) {
245            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 
246                + newPercentUsage + "% of available memory");
247        }   
248        if (started.get()) {
249            // Switching from being full to not being full..
250            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
251                synchronized (usageMutex) {
252                    usageMutex.notifyAll();
253                    if (!callbacks.isEmpty()) {
254                        for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
255                            Runnable callback = iter.next();
256                            getExecutor().execute(callback);
257                        }
258                        callbacks.clear();
259                    }
260                }
261            }
262            if (!listeners.isEmpty()) {
263                // Let the listeners know on a separate thread
264                Runnable listenerNotifier = new Runnable() {
265                    public void run() {
266                        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
267                            UsageListener l = iter.next();
268                            l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
269                        }
270                    }
271                };
272                if (started.get()) {
273                    getExecutor().execute(listenerNotifier);
274                } else {
275                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
276                }
277            }
278        }
279    }
280
281    public String getName() {
282        return name;
283    }
284
285    @Override
286    public String toString() {
287        return "Usage(" + getName() + ") percentUsage=" + percentUsage
288                + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
289                + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
290                + (parent != null ? ";Parent:" + parent.toString() : "");
291    }
292
293    @SuppressWarnings("unchecked")
294    public void start() {
295        if (started.compareAndSet(false, true)){
296            if (parent != null) {
297                parent.addChild(this);
298            }
299            for (T t:children) {
300                t.start();
301            }
302        }
303    }
304
305    @SuppressWarnings("unchecked")
306    public void stop() {
307        if (started.compareAndSet(true, false)){
308            if (parent != null) {
309                parent.removeChild(this);
310            }
311            
312            //clear down any callbacks
313            synchronized (usageMutex) {
314                usageMutex.notifyAll();
315                for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
316                    Runnable callback = iter.next();
317                    callback.run();
318                }
319                this.callbacks.clear();
320            }
321            for (T t:children) {
322                t.stop();
323            }
324        }
325    }
326
327    protected void addChild(T child) {
328        children.add(child);
329        if (started.get()) {
330            child.start();
331        }
332    }
333
334    protected void removeChild(T child) {
335        children.remove(child);
336    }
337
338    /**
339     * @param callback
340     * @return true if the UsageManager was full. The callback will only be
341     *         called if this method returns true.
342     */
343    public boolean notifyCallbackWhenNotFull(final Runnable callback) {
344        if (parent != null) {
345            Runnable r = new Runnable() {
346
347                public void run() {
348                    synchronized (usageMutex) {
349                        if (percentUsage >= 100) {
350                            callbacks.add(callback);
351                        } else {
352                            callback.run();
353                        }
354                    }
355                }
356            };
357            if (parent.notifyCallbackWhenNotFull(r)) {
358                return true;
359            }
360        }
361        synchronized (usageMutex) {
362            if (percentUsage >= 100) {
363                callbacks.add(callback);
364                return true;
365            } else {
366                return false;
367            }
368        }
369    }
370
371    /**
372     * @return the limiter
373     */
374    public UsageCapacity getLimiter() {
375        return this.limiter;
376    }
377
378    /**
379     * @param limiter the limiter to set
380     */
381    public void setLimiter(UsageCapacity limiter) {
382        this.limiter = limiter;
383    }
384
385    /**
386     * @return the pollingTime
387     */
388    public int getPollingTime() {
389        return this.pollingTime;
390    }
391
392    /**
393     * @param pollingTime the pollingTime to set
394     */
395    public void setPollingTime(int pollingTime) {
396        this.pollingTime = pollingTime;
397    }
398
399    public void setName(String name) {
400        this.name = name;
401    }
402
403    public T getParent() {
404        return parent;
405    }
406
407    public void setParent(T parent) {
408        this.parent = parent;
409    }
410    
411    public void setExecutor (ThreadPoolExecutor executor) {
412        this.executor = executor;
413    }
414    public ThreadPoolExecutor getExecutor() {
415        return executor;
416    }
417}