001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.index; 018 019 import java.io.IOException; 020 import java.lang.ref.WeakReference; 021 import java.util.Iterator; 022 import java.util.Map; 023 import java.util.concurrent.atomic.AtomicBoolean; 024 import java.util.concurrent.atomic.AtomicLong; 025 026 import org.apache.kahadb.index.ListNode.ListIterator; 027 import org.apache.kahadb.page.Page; 028 import org.apache.kahadb.page.PageFile; 029 import org.apache.kahadb.page.Transaction; 030 import org.apache.kahadb.util.Marshaller; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 public class ListIndex<Key,Value> implements Index<Key,Value> { 035 036 private static final Logger LOG = LoggerFactory.getLogger(ListIndex.class); 037 public final static long NOT_SET = -1; 038 protected PageFile pageFile; 039 protected long headPageId; 040 protected long tailPageId; 041 private AtomicLong size = new AtomicLong(0); 042 043 protected AtomicBoolean loaded = new AtomicBoolean(); 044 045 private ListNode.NodeMarshaller<Key, Value> marshaller; 046 private Marshaller<Key> keyMarshaller; 047 private Marshaller<Value> valueMarshaller; 048 049 public ListIndex() { 050 } 051 052 public ListIndex(PageFile pageFile, long headPageId) { 053 this.pageFile = pageFile; 054 setHeadPageId(headPageId); 055 } 056 057 @SuppressWarnings("rawtypes") 058 public ListIndex(PageFile pageFile, Page page) { 059 this(pageFile, page.getPageId()); 060 } 061 062 synchronized public void load(Transaction tx) throws IOException { 063 if (loaded.compareAndSet(false, true)) { 064 LOG.debug("loading"); 065 if( keyMarshaller == null ) { 066 throw new IllegalArgumentException("The key marshaller must be set before loading the ListIndex"); 067 } 068 if( valueMarshaller == null ) { 069 throw new IllegalArgumentException("The value marshaller must be set before loading the ListIndex"); 070 } 071 072 marshaller = new ListNode.NodeMarshaller<Key, Value>(keyMarshaller, valueMarshaller); 073 final Page<ListNode<Key,Value>> p = tx.load(getHeadPageId(), null); 074 if( p.getType() == Page.PAGE_FREE_TYPE ) { 075 // Need to initialize it.. 076 ListNode<Key, Value> root = createNode(p); 077 storeNode(tx, root, true); 078 setHeadPageId(p.getPageId()); 079 setTailPageId(getHeadPageId()); 080 } else { 081 ListNode<Key, Value> node = loadNode(tx, getHeadPageId()); 082 setTailPageId(getHeadPageId()); 083 size.addAndGet(node.size(tx)); 084 while (node.getNext() != NOT_SET ) { 085 node = loadNode(tx, node.getNext()); 086 size.addAndGet(node.size(tx)); 087 setTailPageId(node.getPageId()); 088 } 089 } 090 } 091 } 092 093 synchronized public void unload(Transaction tx) { 094 if (loaded.compareAndSet(true, false)) { 095 } 096 } 097 098 protected ListNode<Key,Value> getHead(Transaction tx) throws IOException { 099 return loadNode(tx, getHeadPageId()); 100 } 101 102 protected ListNode<Key,Value> getTail(Transaction tx) throws IOException { 103 return loadNode(tx, getTailPageId()); 104 } 105 106 synchronized public boolean containsKey(Transaction tx, Key key) throws IOException { 107 assertLoaded(); 108 109 if (size.get() == 0) { 110 return false; 111 } 112 113 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) { 114 Map.Entry<Key,Value> candidate = iterator.next(); 115 if (key.equals(candidate.getKey())) { 116 return true; 117 } 118 } 119 return false; 120 } 121 122 private ListNode<Key, Value> lastGetNodeCache = null; 123 private Map.Entry<Key, Value> lastGetEntryCache = null; 124 private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null); 125 126 @SuppressWarnings({ "rawtypes", "unchecked" }) 127 synchronized public Value get(Transaction tx, Key key) throws IOException { 128 assertLoaded(); 129 for (Iterator<Map.Entry<Key,Value>> iterator = iterator(tx); iterator.hasNext(); ) { 130 Map.Entry<Key,Value> candidate = iterator.next(); 131 if (key.equals(candidate.getKey())) { 132 this.lastGetNodeCache = ((ListIterator) iterator).getCurrent(); 133 this.lastGetEntryCache = candidate; 134 this.lastCacheTxSrc = new WeakReference<Transaction>(tx); 135 return candidate.getValue(); 136 } 137 } 138 return null; 139 } 140 141 /** 142 * Update the value of the item with the given key in the list if ot exists, otherwise 143 * it appends the value to the end of the list. 144 * 145 * @return the old value contained in the list if one exists or null. 146 */ 147 @SuppressWarnings({ "rawtypes" }) 148 synchronized public Value put(Transaction tx, Key key, Value value) throws IOException { 149 150 Value oldValue = null; 151 152 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) { 153 154 if(lastGetEntryCache.getKey().equals(key)) { 155 oldValue = lastGetEntryCache.setValue(value); 156 lastGetEntryCache.setValue(value); 157 lastGetNodeCache.storeUpdate(tx); 158 flushCache(); 159 return oldValue; 160 } 161 162 // This searches from the last location of a call to get for the element to replace 163 // all the way to the end of the ListIndex. 164 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx); 165 while (iterator.hasNext()) { 166 Map.Entry<Key, Value> entry = iterator.next(); 167 if (entry.getKey().equals(key)) { 168 oldValue = entry.setValue(value); 169 ((ListIterator) iterator).getCurrent().storeUpdate(tx); 170 flushCache(); 171 return oldValue; 172 } 173 } 174 } else { 175 flushCache(); 176 } 177 178 // Not found because the cache wasn't set or its not at the end of the list so we 179 // start from the beginning and go to the cached location or the end, then we do 180 // an add if its not found. 181 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx); 182 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { 183 Map.Entry<Key, Value> entry = iterator.next(); 184 if (entry.getKey().equals(key)) { 185 oldValue = entry.setValue(value); 186 ((ListIterator) iterator).getCurrent().storeUpdate(tx); 187 flushCache(); 188 return oldValue; 189 } 190 } 191 192 // Not found so add it last. 193 flushCache(); 194 195 return add(tx, key, value); 196 } 197 198 synchronized public Value add(Transaction tx, Key key, Value value) throws IOException { 199 assertLoaded(); 200 getTail(tx).put(tx, key, value); 201 size.incrementAndGet(); 202 flushCache(); 203 return null; 204 } 205 206 synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException { 207 assertLoaded(); 208 getHead(tx).addFirst(tx, key, value); 209 size.incrementAndGet(); 210 flushCache(); 211 return null; 212 } 213 214 @SuppressWarnings("rawtypes") 215 synchronized public Value remove(Transaction tx, Key key) throws IOException { 216 assertLoaded(); 217 218 if (size.get() == 0) { 219 return null; 220 } 221 222 if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) { 223 224 // This searches from the last location of a call to get for the element to remove 225 // all the way to the end of the ListIndex. 226 Iterator<Map.Entry<Key, Value>> iterator = lastGetNodeCache.iterator(tx); 227 while (iterator.hasNext()) { 228 Map.Entry<Key, Value> entry = iterator.next(); 229 if (entry.getKey().equals(key)) { 230 iterator.remove(); 231 flushCache(); 232 return entry.getValue(); 233 } 234 } 235 } else { 236 flushCache(); 237 } 238 239 // Not found because the cache wasn't set or its not at the end of the list so we 240 // start from the beginning and go to the cached location or the end to find the 241 // element to remove. 242 Iterator<Map.Entry<Key, Value>> iterator = iterator(tx); 243 while (iterator.hasNext() && ((ListIterator) iterator).getCurrent() != lastGetNodeCache) { 244 Map.Entry<Key, Value> entry = iterator.next(); 245 if (entry.getKey().equals(key)) { 246 iterator.remove(); 247 flushCache(); 248 return entry.getValue(); 249 } 250 } 251 252 return null; 253 } 254 255 public void onRemove() { 256 size.decrementAndGet(); 257 flushCache(); 258 } 259 260 public boolean isTransient() { 261 return false; 262 } 263 264 synchronized public void clear(Transaction tx) throws IOException { 265 for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) { 266 ListNode<Key,Value>candidate = iterator.next(); 267 candidate.clear(tx); 268 // break up the transaction 269 tx.commit(); 270 } 271 flushCache(); 272 size.set(0); 273 } 274 275 synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException { 276 return getHead(tx).listNodeIterator(tx); 277 } 278 279 synchronized public boolean isEmpty(final Transaction tx) throws IOException { 280 return getHead(tx).isEmpty(tx); 281 } 282 283 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException { 284 return getHead(tx).iterator(tx); 285 } 286 287 synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException { 288 return getHead(tx).iterator(tx, initialPosition); 289 } 290 291 synchronized public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException { 292 return getHead(tx).getFirst(tx); 293 } 294 295 synchronized public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException { 296 return getTail(tx).getLast(tx); 297 } 298 299 private void assertLoaded() throws IllegalStateException { 300 if( !loaded.get() ) { 301 throw new IllegalStateException("TheListIndex is not loaded"); 302 } 303 } 304 305 ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException { 306 Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller); 307 ListNode<Key, Value> node = page.get(); 308 node.setPage(page); 309 node.setContainingList(this); 310 return node; 311 } 312 313 ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException { 314 ListNode<Key,Value> node = new ListNode<Key,Value>(); 315 node.setPage(page); 316 page.set(node); 317 node.setContainingList(this); 318 return node; 319 } 320 321 public ListNode<Key,Value> createNode(Transaction tx) throws IOException { 322 return createNode(tx.<ListNode<Key,Value>>load(tx.<ListNode<Key,Value>>allocate().getPageId(), null)); 323 } 324 325 public void storeNode(Transaction tx, ListNode<Key,Value> node, boolean overflow) throws IOException { 326 tx.store(node.getPage(), marshaller, overflow); 327 flushCache(); 328 } 329 330 public PageFile getPageFile() { 331 return pageFile; 332 } 333 334 public void setPageFile(PageFile pageFile) { 335 this.pageFile = pageFile; 336 } 337 338 public long getHeadPageId() { 339 return headPageId; 340 } 341 342 public void setHeadPageId(long headPageId) { 343 this.headPageId = headPageId; 344 } 345 346 public Marshaller<Key> getKeyMarshaller() { 347 return keyMarshaller; 348 } 349 public void setKeyMarshaller(Marshaller<Key> keyMarshaller) { 350 this.keyMarshaller = keyMarshaller; 351 } 352 353 public Marshaller<Value> getValueMarshaller() { 354 return valueMarshaller; 355 } 356 public void setValueMarshaller(Marshaller<Value> valueMarshaller) { 357 this.valueMarshaller = valueMarshaller; 358 } 359 360 public void setTailPageId(long tailPageId) { 361 this.tailPageId = tailPageId; 362 } 363 364 public long getTailPageId() { 365 return tailPageId; 366 } 367 368 public long size() { 369 return size.get(); 370 } 371 372 private void flushCache() { 373 this.lastGetEntryCache = null; 374 this.lastGetNodeCache = null; 375 this.lastCacheTxSrc.clear(); 376 } 377 }