001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.NoSuchElementException;
026import java.util.Set;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicReference;
029
030import org.apache.activemq.management.SizeStatisticImpl;
031import org.apache.activemq.store.PList;
032import org.apache.activemq.store.PListEntry;
033import org.apache.activemq.store.kahadb.disk.index.ListIndex;
034import org.apache.activemq.store.kahadb.disk.index.ListNode;
035import org.apache.activemq.store.kahadb.disk.journal.Location;
036import org.apache.activemq.store.kahadb.disk.page.Transaction;
037import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
038import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
039import org.apache.activemq.util.ByteSequence;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043public class PListImpl extends ListIndex<String, Location> implements PList {
044    static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
045    final PListStoreImpl store;
046    private String name;
047    Object indexLock;
048    private final SizeStatisticImpl messageSize;
049
050    PListImpl(PListStoreImpl store) {
051        this.store = store;
052        this.indexLock = store.getIndexLock();
053        setPageFile(store.getPageFile());
054        setKeyMarshaller(StringMarshaller.INSTANCE);
055        setValueMarshaller(LocationMarshaller.INSTANCE);
056
057        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
058        messageSize.setEnabled(true);
059    }
060
061    public void setName(String name) {
062        this.name = name;
063    }
064
065    @Override
066    public String getName() {
067        return this.name;
068    }
069
070    void read(DataInput in) throws IOException {
071        setHeadPageId(in.readLong());
072    }
073
074    public void write(DataOutput out) throws IOException {
075        out.writeLong(getHeadPageId());
076    }
077
078    @Override
079    public synchronized void destroy() throws IOException {
080        synchronized (indexLock) {
081            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
082                @Override
083                public void execute(Transaction tx) throws IOException {
084                    clear(tx);
085                    unload(tx);
086                }
087            });
088        }
089    }
090
091    class Locator {
092        final String id;
093
094        Locator(String id) {
095            this.id = id;
096        }
097
098        PListImpl plist() {
099            return PListImpl.this;
100        }
101    }
102
103    @Override
104    public Object addLast(final String id, final ByteSequence bs) throws IOException {
105        final Location location = this.store.write(bs, false);
106        synchronized (indexLock) {
107            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
108                @Override
109                public void execute(Transaction tx) throws IOException {
110                    add(tx, id, location);
111                }
112            });
113        }
114        return new Locator(id);
115    }
116
117    @Override
118    public Object addFirst(final String id, final ByteSequence bs) throws IOException {
119        final Location location = this.store.write(bs, false);
120        synchronized (indexLock) {
121            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
122                @Override
123                public void execute(Transaction tx) throws IOException {
124                    addFirst(tx, id, location);
125                }
126            });
127        }
128        return new Locator(id);
129    }
130
131    @Override
132    public boolean remove(final Object l) throws IOException {
133        Locator locator = (Locator) l;
134        assert locator!=null;
135        assert locator.plist()==this;
136        return remove(locator.id);
137    }
138
139    public boolean remove(final String id) throws IOException {
140        final AtomicBoolean result = new AtomicBoolean();
141        synchronized (indexLock) {
142            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
143                @Override
144                public void execute(Transaction tx) throws IOException {
145                    result.set(remove(tx, id) != null);
146                }
147            });
148        }
149        return result.get();
150    }
151
152    public boolean remove(final long position) throws IOException {
153        final AtomicBoolean result = new AtomicBoolean();
154        synchronized (indexLock) {
155            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
156                @Override
157                public void execute(Transaction tx) throws IOException {
158                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
159                    if (iterator.hasNext()) {
160                        iterator.next();
161                        iterator.remove();
162                        result.set(true);
163                    } else {
164                        result.set(false);
165                    }
166                }
167            });
168        }
169        return result.get();
170    }
171
172    public PListEntry get(final long position) throws IOException {
173        PListEntry result = null;
174        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
175        synchronized (indexLock) {
176            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
177                @Override
178                public void execute(Transaction tx) throws IOException {
179                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
180                    ref.set(iterator.next());
181                }
182            });
183        }
184        if (ref.get() != null) {
185            ByteSequence bs = this.store.getPayload(ref.get().getValue());
186            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
187        }
188        return result;
189    }
190
191    public PListEntry getFirst() throws IOException {
192        PListEntry result = null;
193        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
194        synchronized (indexLock) {
195            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
196                @Override
197                public void execute(Transaction tx) throws IOException {
198                    ref.set(getFirst(tx));
199                }
200            });
201        }
202        if (ref.get() != null) {
203            ByteSequence bs = this.store.getPayload(ref.get().getValue());
204            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
205        }
206        return result;
207    }
208
209    public PListEntry getLast() throws IOException {
210        PListEntry result = null;
211        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
212        synchronized (indexLock) {
213            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
214                @Override
215                public void execute(Transaction tx) throws IOException {
216                    ref.set(getLast(tx));
217                }
218            });
219        }
220        if (ref.get() != null) {
221            ByteSequence bs = this.store.getPayload(ref.get().getValue());
222            result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey()));
223        }
224        return result;
225    }
226
227    @Override
228    public boolean isEmpty() {
229        return size() == 0;
230    }
231
232    @Override
233    public PListIterator iterator() throws IOException {
234        return new PListIteratorImpl();
235    }
236
237    final class PListIteratorImpl implements PListIterator {
238        final Iterator<Map.Entry<String, Location>> iterator;
239        final Transaction tx;
240
241        PListIteratorImpl() throws IOException {
242            tx = store.pageFile.tx();
243            synchronized (indexLock) {
244                this.iterator = iterator(tx);
245            }
246        }
247
248        @Override
249        public boolean hasNext() {
250            return iterator.hasNext();
251        }
252
253        @Override
254        public PListEntry next() {
255            Map.Entry<String, Location> entry = iterator.next();
256            ByteSequence bs = null;
257            try {
258                bs = store.getPayload(entry.getValue());
259            } catch (IOException unexpected) {
260                NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
261                e.initCause(unexpected);
262                throw e;
263            }
264            return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey()));
265        }
266
267        @Override
268        public void remove() {
269            try {
270                synchronized (indexLock) {
271                    tx.execute(new Transaction.Closure<IOException>() {
272                        @Override
273                        public void execute(Transaction tx) throws IOException {
274                            iterator.remove();
275                        }
276                    });
277                }
278            } catch (IOException unexpected) {
279                IllegalStateException e = new IllegalStateException(unexpected);
280                e.initCause(unexpected);
281                throw e;
282            }
283        }
284
285        @Override
286        public void release() {
287            try {
288                tx.rollback();
289            } catch (IOException unexpected) {
290                IllegalStateException e = new IllegalStateException(unexpected);
291                e.initCause(unexpected);
292                throw e;
293            }
294        }
295    }
296
297    public void claimFileLocations(final Set<Integer> candidates) throws IOException {
298        synchronized (indexLock) {
299            if (loaded.get()) {
300                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
301                    @Override
302                    public void execute(Transaction tx) throws IOException {
303                        Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
304                        while (iterator.hasNext()) {
305                            Location location = iterator.next().getValue();
306                            candidates.remove(location.getDataFileId());
307                        }
308                    }
309                });
310            }
311        }
312    }
313
314    @Override
315    public long messageSize() {
316        return messageSize.getTotalSize();
317    }
318
319    @Override
320    public synchronized Location add(Transaction tx, String key, Location value)
321            throws IOException {
322        Location location = super.add(tx, key, value);
323        messageSize.addSize(value.getSize());
324        return location;
325    }
326
327    @Override
328    public synchronized Location addFirst(Transaction tx, String key,
329            Location value) throws IOException {
330        Location location = super.addFirst(tx, key, value);
331        messageSize.addSize(value.getSize());
332        return location;
333    }
334
335    @Override
336    public synchronized void clear(Transaction tx) throws IOException {
337        messageSize.reset();
338        super.clear(tx);
339    }
340
341    @Override
342    protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) {
343        try {
344            Iterator<Entry<String, Location>> i = node.iterator(tx);
345            while (i.hasNext()) {
346                messageSize.addSize(i.next().getValue().getSize());
347            }
348        } catch (IOException e) {
349            LOG.warn("could not increment message size", e);
350        }
351    }
352
353    @Override
354    public void onRemove(Entry<String, Location> removed) {
355        super.onRemove(removed);
356        if (removed != null) {
357            messageSize.addSize(-removed.getValue().getSize());
358        }
359    }
360
361    @Override
362    public String toString() {
363        return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
364    }
365}