001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb.plist; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.IOException; 022 import java.util.Iterator; 023 import java.util.Map; 024 import java.util.NoSuchElementException; 025 import java.util.Set; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 import java.util.concurrent.atomic.AtomicReference; 028 import org.apache.kahadb.index.ListIndex; 029 import org.apache.kahadb.journal.Location; 030 import org.apache.kahadb.page.Transaction; 031 import org.apache.kahadb.util.ByteSequence; 032 import org.apache.kahadb.util.LocationMarshaller; 033 import org.apache.kahadb.util.StringMarshaller; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 public class PList extends ListIndex<String, Location> { 038 static final Logger LOG = LoggerFactory.getLogger(PList.class); 039 final PListStore store; 040 private String name; 041 Object indexLock; 042 043 PList(PListStore store) { 044 this.store = store; 045 this.indexLock = store.getIndexLock(); 046 setPageFile(store.getPageFile()); 047 setKeyMarshaller(StringMarshaller.INSTANCE); 048 setValueMarshaller(LocationMarshaller.INSTANCE); 049 } 050 051 public void setName(String name) { 052 this.name = name; 053 } 054 055 public String getName() { 056 return this.name; 057 } 058 059 void read(DataInput in) throws IOException { 060 setHeadPageId(in.readLong()); 061 } 062 063 public void write(DataOutput out) throws IOException { 064 out.writeLong(getHeadPageId()); 065 } 066 067 public synchronized void destroy() throws IOException { 068 synchronized (indexLock) { 069 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 070 public void execute(Transaction tx) throws IOException { 071 clear(tx); 072 unload(tx); 073 } 074 }); 075 } 076 } 077 078 public void addLast(final String id, final ByteSequence bs) throws IOException { 079 final Location location = this.store.write(bs, false); 080 synchronized (indexLock) { 081 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 082 public void execute(Transaction tx) throws IOException { 083 add(tx, id, location); 084 } 085 }); 086 } 087 } 088 089 public void addFirst(final String id, final ByteSequence bs) throws IOException { 090 final Location location = this.store.write(bs, false); 091 synchronized (indexLock) { 092 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 093 public void execute(Transaction tx) throws IOException { 094 addFirst(tx, id, location); 095 } 096 }); 097 } 098 } 099 100 public boolean remove(final String id) throws IOException { 101 final AtomicBoolean result = new AtomicBoolean(); 102 synchronized (indexLock) { 103 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 104 public void execute(Transaction tx) throws IOException { 105 result.set(remove(tx, id) != null); 106 } 107 }); 108 } 109 return result.get(); 110 } 111 112 public boolean remove(final long position) throws IOException { 113 final AtomicBoolean result = new AtomicBoolean(); 114 synchronized (indexLock) { 115 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 116 public void execute(Transaction tx) throws IOException { 117 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 118 if (iterator.hasNext()) { 119 iterator.next(); 120 iterator.remove(); 121 result.set(true); 122 } else { 123 result.set(false); 124 } 125 } 126 }); 127 } 128 return result.get(); 129 } 130 131 public PListEntry get(final long position) throws IOException { 132 PListEntry result = null; 133 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 134 synchronized (indexLock) { 135 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 136 public void execute(Transaction tx) throws IOException { 137 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 138 ref.set(iterator.next()); 139 } 140 }); 141 } 142 if (ref.get() != null) { 143 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 144 result = new PListEntry(ref.get().getKey(), bs); 145 } 146 return result; 147 } 148 149 public PListEntry getFirst() throws IOException { 150 PListEntry result = null; 151 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 152 synchronized (indexLock) { 153 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 154 public void execute(Transaction tx) throws IOException { 155 ref.set(getFirst(tx)); 156 } 157 }); 158 } 159 if (ref.get() != null) { 160 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 161 result = new PListEntry(ref.get().getKey(), bs); 162 } 163 return result; 164 } 165 166 public PListEntry getLast() throws IOException { 167 PListEntry result = null; 168 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 169 synchronized (indexLock) { 170 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 171 public void execute(Transaction tx) throws IOException { 172 ref.set(getLast(tx)); 173 } 174 }); 175 } 176 if (ref.get() != null) { 177 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 178 result = new PListEntry(ref.get().getKey(), bs); 179 } 180 return result; 181 } 182 183 public boolean isEmpty() { 184 return size() == 0; 185 } 186 187 public PListIterator iterator() throws IOException { 188 return new PListIterator(); 189 } 190 191 public final class PListIterator implements Iterator<PListEntry> { 192 final Iterator<Map.Entry<String, Location>> iterator; 193 final Transaction tx; 194 195 PListIterator() throws IOException { 196 tx = store.pageFile.tx(); 197 synchronized (indexLock) { 198 this.iterator = iterator(tx); 199 } 200 } 201 202 @Override 203 public boolean hasNext() { 204 return iterator.hasNext(); 205 } 206 207 @Override 208 public PListEntry next() { 209 Map.Entry<String, Location> entry = iterator.next(); 210 ByteSequence bs = null; 211 try { 212 bs = store.getPayload(entry.getValue()); 213 } catch (IOException unexpected) { 214 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage()); 215 e.initCause(unexpected); 216 throw e; 217 } 218 return new PListEntry(entry.getKey(), bs); 219 } 220 221 @Override 222 public void remove() { 223 try { 224 synchronized (indexLock) { 225 tx.execute(new Transaction.Closure<IOException>() { 226 @Override 227 public void execute(Transaction tx) throws IOException { 228 iterator.remove(); 229 } 230 }); 231 } 232 } catch (IOException unexpected) { 233 IllegalStateException e = new IllegalStateException(unexpected); 234 e.initCause(unexpected); 235 throw e; 236 } 237 } 238 239 public void release() { 240 try { 241 tx.rollback(); 242 } catch (IOException unexpected) { 243 IllegalStateException e = new IllegalStateException(unexpected); 244 e.initCause(unexpected); 245 throw e; 246 } 247 } 248 } 249 250 public void claimFileLocations(final Set<Integer> candidates) throws IOException { 251 synchronized (indexLock) { 252 if (loaded.get()) { 253 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 254 public void execute(Transaction tx) throws IOException { 255 Iterator<Map.Entry<String,Location>> iterator = iterator(tx); 256 while (iterator.hasNext()) { 257 Location location = iterator.next().getValue(); 258 candidates.remove(location.getDataFileId()); 259 } 260 } 261 }); 262 } 263 } 264 } 265 266 @Override 267 public String toString() { 268 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]"; 269 } 270 }