001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.File; 020import java.io.FileNotFoundException; 021import java.io.FilenameFilter; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.io.UnsupportedEncodingException; 025import java.nio.ByteBuffer; 026import java.nio.channels.ClosedByInterruptException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 093 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 094 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 095 096 // skip corruption on getNextLocation 097 recoveryPosition.setOffset((int) sequence.getLast() + 1); 098 recoveryPosition.setSize(-1); 099 100 dataFile.corruptedBlocks.add(sequence); 101 } catch (IOException e) { 102 } finally { 103 accessorPool.closeDataFileAccessor(reader); 104 } 105 } 106 107 public DataFileAccessorPool getAccessorPool() { 108 return accessorPool; 109 } 110 111 public enum PreallocationStrategy { 112 SPARSE_FILE, 113 OS_KERNEL_COPY, 114 ZEROS, 115 CHUNKED_ZEROS; 116 } 117 118 public enum PreallocationScope { 119 ENTIRE_JOURNAL, 120 ENTIRE_JOURNAL_ASYNC, 121 NONE; 122 } 123 124 public enum JournalDiskSyncStrategy { 125 ALWAYS, 126 PERIODIC, 127 NEVER; 128 } 129 130 private static byte[] createBatchControlRecordHeader() { 131 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 132 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 133 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 134 os.write(BATCH_CONTROL_RECORD_MAGIC); 135 ByteSequence sequence = os.toByteSequence(); 136 sequence.compact(); 137 return sequence.getData(); 138 } catch (IOException e) { 139 throw new RuntimeException("Could not create batch control record header.", e); 140 } 141 } 142 143 private static byte[] createEmptyBatchControlRecordHeader() { 144 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 145 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 146 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 147 os.write(BATCH_CONTROL_RECORD_MAGIC); 148 os.writeInt(0); 149 os.writeLong(0l); 150 ByteSequence sequence = os.toByteSequence(); 151 sequence.compact(); 152 return sequence.getData(); 153 } catch (IOException e) { 154 throw new RuntimeException("Could not create empty batch control record header.", e); 155 } 156 } 157 158 private static byte[] createEofBatchAndLocationRecord() { 159 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 160 os.writeInt(EOF_INT); 161 os.writeByte(EOF_EOT); 162 ByteSequence sequence = os.toByteSequence(); 163 sequence.compact(); 164 return sequence.getData(); 165 } catch (IOException e) { 166 throw new RuntimeException("Could not create eof header.", e); 167 } 168 } 169 170 public static final String DEFAULT_DIRECTORY = "."; 171 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 172 public static final String DEFAULT_FILE_PREFIX = "db-"; 173 public static final String DEFAULT_FILE_SUFFIX = ".log"; 174 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 175 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 176 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 177 178 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 179 180 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 181 182 protected File directory = new File(DEFAULT_DIRECTORY); 183 protected File directoryArchive; 184 private boolean directoryArchiveOverridden = false; 185 186 protected String filePrefix = DEFAULT_FILE_PREFIX; 187 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 188 protected boolean started; 189 190 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 191 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 192 193 protected FileAppender appender; 194 protected DataFileAccessorPool accessorPool; 195 196 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 197 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 198 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 199 200 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 201 protected ScheduledFuture cleanupTask; 202 protected AtomicLong totalLength = new AtomicLong(); 203 protected boolean archiveDataLogs; 204 private ReplicationTarget replicationTarget; 205 protected boolean checksum; 206 protected boolean checkForCorruptionOnStartup; 207 protected boolean enableAsyncDiskSync = true; 208 private int nextDataFileId = 1; 209 private Object dataFileIdLock = new Object(); 210 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 211 private volatile DataFile nextDataFile; 212 213 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 214 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 215 private File osKernelCopyTemplateFile = null; 216 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 217 218 public interface DataFileRemovedListener { 219 void fileRemoved(DataFile datafile); 220 } 221 222 private DataFileRemovedListener dataFileRemovedListener; 223 224 public synchronized void start() throws IOException { 225 if (started) { 226 return; 227 } 228 229 long start = System.currentTimeMillis(); 230 accessorPool = new DataFileAccessorPool(this); 231 started = true; 232 233 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 234 235 File[] files = directory.listFiles(new FilenameFilter() { 236 @Override 237 public boolean accept(File dir, String n) { 238 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 239 } 240 }); 241 242 if (files != null) { 243 for (File file : files) { 244 try { 245 String n = file.getName(); 246 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 247 int num = Integer.parseInt(numStr); 248 DataFile dataFile = new DataFile(file, num); 249 fileMap.put(dataFile.getDataFileId(), dataFile); 250 totalLength.addAndGet(dataFile.getLength()); 251 } catch (NumberFormatException e) { 252 // Ignore file that do not match the pattern. 253 } 254 } 255 256 // Sort the list so that we can link the DataFiles together in the 257 // right order. 258 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 259 Collections.sort(l); 260 for (DataFile df : l) { 261 if (df.getLength() == 0) { 262 // possibly the result of a previous failed write 263 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 264 continue; 265 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 266 continue; 267 } 268 dataFiles.addLast(df); 269 fileByFileMap.put(df.getFile(), df); 270 271 if( isCheckForCorruptionOnStartup() ) { 272 lastAppendLocation.set(recoveryCheck(df)); 273 } 274 } 275 } 276 277 if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { 278 // create a template file that will be used to pre-allocate the journal files 279 if (osKernelCopyTemplateFile == null) { 280 osKernelCopyTemplateFile = createJournalTemplateFile(); 281 } 282 } 283 284 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 285 @Override 286 public Thread newThread(Runnable r) { 287 Thread schedulerThread = new Thread(r); 288 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 289 schedulerThread.setDaemon(true); 290 return schedulerThread; 291 } 292 }); 293 294 // init current write file 295 if (dataFiles.isEmpty()) { 296 nextDataFileId = 1; 297 rotateWriteFile(); 298 } else { 299 currentDataFile.set(dataFiles.getTail()); 300 nextDataFileId = currentDataFile.get().dataFileId + 1; 301 } 302 303 if( lastAppendLocation.get()==null ) { 304 DataFile df = dataFiles.getTail(); 305 lastAppendLocation.set(recoveryCheck(df)); 306 } 307 308 // ensure we don't report unused space of last journal file in size metric 309 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 310 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 311 } 312 313 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 314 @Override 315 public void run() { 316 cleanup(); 317 } 318 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 319 320 long end = System.currentTimeMillis(); 321 LOG.trace("Startup took: "+(end-start)+" ms"); 322 } 323 324 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 325 326 if (PreallocationScope.NONE != preallocationScope) { 327 328 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 329 doPreallocationKernelCopy(file); 330 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 331 doPreallocationZeros(file); 332 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 333 doPreallocationChunkedZeros(file); 334 } else { 335 doPreallocationSparseFile(file); 336 } 337 } 338 } 339 340 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 341 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 342 try { 343 FileChannel channel = file.getChannel(); 344 channel.position(0); 345 channel.write(journalEof); 346 channel.position(maxFileLength - 5); 347 journalEof.rewind(); 348 channel.write(journalEof); 349 channel.force(false); 350 channel.position(0); 351 } catch (ClosedByInterruptException ignored) { 352 LOG.trace("Could not preallocate journal file with sparse file", ignored); 353 } catch (IOException e) { 354 LOG.error("Could not preallocate journal file with sparse file", e); 355 } 356 } 357 358 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 359 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 360 buffer.put(EOF_RECORD); 361 buffer.rewind(); 362 try { 363 FileChannel channel = file.getChannel(); 364 channel.write(buffer); 365 channel.force(false); 366 channel.position(0); 367 } catch (ClosedByInterruptException ignored) { 368 LOG.trace("Could not preallocate journal file with zeros", ignored); 369 } catch (IOException e) { 370 LOG.error("Could not preallocate journal file with zeros", e); 371 } 372 } 373 374 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 375 try { 376 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 377 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 378 templateRaf.close(); 379 } catch (ClosedByInterruptException ignored) { 380 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 381 } catch (FileNotFoundException e) { 382 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 383 } catch (IOException e) { 384 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 385 } 386 } 387 388 private File createJournalTemplateFile() { 389 String fileName = "db-log.template"; 390 File rc = new File(directory, fileName); 391 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 392 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 393 templateRaf.setLength(maxFileLength); 394 templateRaf.getChannel().force(true); 395 } catch (FileNotFoundException e) { 396 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 397 } catch (IOException e) { 398 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 399 } 400 return rc; 401 } 402 403 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 404 405 ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); 406 buffer.put(EOF_RECORD); 407 buffer.rewind(); 408 409 try { 410 FileChannel channel = file.getChannel(); 411 412 int remLen = maxFileLength; 413 while (remLen > 0) { 414 if (remLen < buffer.remaining()) { 415 buffer.limit(remLen); 416 } 417 int writeLen = channel.write(buffer); 418 remLen -= writeLen; 419 buffer.rewind(); 420 } 421 422 channel.force(false); 423 channel.position(0); 424 } catch (ClosedByInterruptException ignored) { 425 LOG.trace("Could not preallocate journal file with zeros", ignored); 426 } catch (IOException e) { 427 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 428 } 429 } 430 431 private static byte[] bytes(String string) { 432 try { 433 return string.getBytes("UTF-8"); 434 } catch (UnsupportedEncodingException e) { 435 throw new RuntimeException(e); 436 } 437 } 438 439 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 440 int firstBatchRecordSize = -1; 441 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 442 Location location = new Location(); 443 location.setDataFileId(dataFile.getDataFileId()); 444 location.setOffset(0); 445 446 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 447 try { 448 firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); 449 } catch (Exception ignored) { 450 } finally { 451 accessorPool.closeDataFileAccessor(reader); 452 } 453 } 454 return firstBatchRecordSize == 0; 455 } 456 457 protected Location recoveryCheck(DataFile dataFile) throws IOException { 458 Location location = new Location(); 459 location.setDataFileId(dataFile.getDataFileId()); 460 location.setOffset(0); 461 462 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 463 try { 464 while (true) { 465 int size = checkBatchRecord(reader, location.getOffset()); 466 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { 467 if (size == 0) { 468 // eof batch record 469 break; 470 } 471 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 472 } else { 473 474 // Perhaps it's just some corruption... scan through the 475 // file to find the next valid batch record. We 476 // may have subsequent valid batch records. 477 int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); 478 if (nextOffset >= 0) { 479 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 480 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 481 dataFile.corruptedBlocks.add(sequence); 482 location.setOffset(nextOffset); 483 } else { 484 break; 485 } 486 } 487 } 488 489 } catch (IOException e) { 490 } finally { 491 accessorPool.closeDataFileAccessor(reader); 492 } 493 494 int existingLen = dataFile.getLength(); 495 dataFile.setLength(location.getOffset()); 496 if (existingLen > dataFile.getLength()) { 497 totalLength.addAndGet(dataFile.getLength() - existingLen); 498 } 499 500 if (!dataFile.corruptedBlocks.isEmpty()) { 501 // Is the end of the data file corrupted? 502 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 503 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 504 } 505 } 506 507 return location; 508 } 509 510 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 511 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 512 byte data[] = new byte[1024*4]; 513 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 514 515 int pos = 0; 516 while (true) { 517 pos = bs.indexOf(header, pos); 518 if (pos >= 0) { 519 return offset + pos; 520 } else { 521 // need to load the next data chunck in.. 522 if (bs.length != data.length) { 523 // If we had a short read then we were at EOF 524 return -1; 525 } 526 offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; 527 bs = new ByteSequence(data, 0, reader.read(offset, data)); 528 pos = 0; 529 } 530 } 531 } 532 533 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 534 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 535 536 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { 537 538 reader.readFully(offset, controlRecord); 539 540 // check for journal eof 541 if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { 542 // eof batch 543 return 0; 544 } 545 546 // Assert that it's a batch record. 547 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 548 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 549 return -1; 550 } 551 } 552 553 int size = controlIs.readInt(); 554 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 555 return -1; 556 } 557 558 if (isChecksum()) { 559 560 long expectedChecksum = controlIs.readLong(); 561 if (expectedChecksum == 0) { 562 // Checksuming was not enabled when the record was stored. 563 // we can't validate the record :( 564 return size; 565 } 566 567 byte data[] = new byte[size]; 568 reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); 569 570 Checksum checksum = new Adler32(); 571 checksum.update(data, 0, data.length); 572 573 if (expectedChecksum != checksum.getValue()) { 574 return -1; 575 } 576 } 577 return size; 578 } 579 } 580 581 void addToTotalLength(int size) { 582 totalLength.addAndGet(size); 583 } 584 585 public long length() { 586 return totalLength.get(); 587 } 588 589 private void rotateWriteFile() throws IOException { 590 synchronized (dataFileIdLock) { 591 DataFile dataFile = nextDataFile; 592 if (dataFile == null) { 593 dataFile = newDataFile(); 594 } 595 synchronized (currentDataFile) { 596 fileMap.put(dataFile.getDataFileId(), dataFile); 597 fileByFileMap.put(dataFile.getFile(), dataFile); 598 dataFiles.addLast(dataFile); 599 currentDataFile.set(dataFile); 600 } 601 nextDataFile = null; 602 } 603 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 604 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 605 } 606 } 607 608 private Runnable preAllocateNextDataFileTask = new Runnable() { 609 @Override 610 public void run() { 611 if (nextDataFile == null) { 612 synchronized (dataFileIdLock){ 613 try { 614 nextDataFile = newDataFile(); 615 } catch (IOException e) { 616 LOG.warn("Failed to proactively allocate data file", e); 617 } 618 } 619 } 620 } 621 }; 622 623 private volatile Future preAllocateNextDataFileFuture; 624 625 private DataFile newDataFile() throws IOException { 626 int nextNum = nextDataFileId++; 627 File file = getFile(nextNum); 628 DataFile nextWriteFile = new DataFile(file, nextNum); 629 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 630 return nextWriteFile; 631 } 632 633 634 public DataFile reserveDataFile() { 635 synchronized (dataFileIdLock) { 636 int nextNum = nextDataFileId++; 637 File file = getFile(nextNum); 638 DataFile reservedDataFile = new DataFile(file, nextNum); 639 synchronized (currentDataFile) { 640 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 641 fileByFileMap.put(file, reservedDataFile); 642 if (dataFiles.isEmpty()) { 643 dataFiles.addLast(reservedDataFile); 644 } else { 645 dataFiles.getTail().linkBefore(reservedDataFile); 646 } 647 } 648 return reservedDataFile; 649 } 650 } 651 652 public File getFile(int nextNum) { 653 String fileName = filePrefix + nextNum + fileSuffix; 654 File file = new File(directory, fileName); 655 return file; 656 } 657 658 DataFile getDataFile(Location item) throws IOException { 659 Integer key = Integer.valueOf(item.getDataFileId()); 660 DataFile dataFile = null; 661 synchronized (currentDataFile) { 662 dataFile = fileMap.get(key); 663 } 664 if (dataFile == null) { 665 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 666 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 667 } 668 return dataFile; 669 } 670 671 public void close() throws IOException { 672 synchronized (this) { 673 if (!started) { 674 return; 675 } 676 cleanupTask.cancel(true); 677 if (preAllocateNextDataFileFuture != null) { 678 preAllocateNextDataFileFuture.cancel(true); 679 } 680 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 681 accessorPool.close(); 682 } 683 // the appender can be calling back to to the journal blocking a close AMQ-5620 684 appender.close(); 685 synchronized (currentDataFile) { 686 fileMap.clear(); 687 fileByFileMap.clear(); 688 dataFiles.clear(); 689 lastAppendLocation.set(null); 690 started = false; 691 } 692 } 693 694 public synchronized void cleanup() { 695 if (accessorPool != null) { 696 accessorPool.disposeUnused(); 697 } 698 } 699 700 public synchronized boolean delete() throws IOException { 701 702 // Close all open file handles... 703 appender.close(); 704 accessorPool.close(); 705 706 boolean result = true; 707 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 708 DataFile dataFile = i.next(); 709 result &= dataFile.delete(); 710 } 711 712 if (preAllocateNextDataFileFuture != null) { 713 preAllocateNextDataFileFuture.cancel(true); 714 } 715 synchronized (dataFileIdLock) { 716 if (nextDataFile != null) { 717 nextDataFile.delete(); 718 nextDataFile = null; 719 } 720 } 721 722 totalLength.set(0); 723 synchronized (currentDataFile) { 724 fileMap.clear(); 725 fileByFileMap.clear(); 726 lastAppendLocation.set(null); 727 dataFiles = new LinkedNodeList<DataFile>(); 728 } 729 // reopen open file handles... 730 accessorPool = new DataFileAccessorPool(this); 731 appender = new DataFileAppender(this); 732 return result; 733 } 734 735 public void removeDataFiles(Set<Integer> files) throws IOException { 736 for (Integer key : files) { 737 // Can't remove the data file (or subsequent files) that is currently being written to. 738 if (key >= lastAppendLocation.get().getDataFileId()) { 739 continue; 740 } 741 DataFile dataFile = null; 742 synchronized (currentDataFile) { 743 dataFile = fileMap.remove(key); 744 if (dataFile != null) { 745 fileByFileMap.remove(dataFile.getFile()); 746 dataFile.unlink(); 747 } 748 } 749 if (dataFile != null) { 750 forceRemoveDataFile(dataFile); 751 } 752 } 753 } 754 755 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 756 accessorPool.disposeDataFileAccessors(dataFile); 757 totalLength.addAndGet(-dataFile.getLength()); 758 if (archiveDataLogs) { 759 File directoryArchive = getDirectoryArchive(); 760 if (directoryArchive.exists()) { 761 LOG.debug("Archive directory exists: {}", directoryArchive); 762 } else { 763 if (directoryArchive.isAbsolute()) 764 if (LOG.isDebugEnabled()) { 765 LOG.debug("Archive directory [{}] does not exist - creating it now", 766 directoryArchive.getAbsolutePath()); 767 } 768 IOHelper.mkdirs(directoryArchive); 769 } 770 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 771 dataFile.move(directoryArchive); 772 LOG.debug("Successfully moved data file"); 773 } else { 774 LOG.debug("Deleting data file: {}", dataFile); 775 if (dataFile.delete()) { 776 LOG.debug("Discarded data file: {}", dataFile); 777 } else { 778 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 779 } 780 } 781 if (dataFileRemovedListener != null) { 782 dataFileRemovedListener.fileRemoved(dataFile); 783 } 784 } 785 786 /** 787 * @return the maxFileLength 788 */ 789 public int getMaxFileLength() { 790 return maxFileLength; 791 } 792 793 /** 794 * @param maxFileLength the maxFileLength to set 795 */ 796 public void setMaxFileLength(int maxFileLength) { 797 this.maxFileLength = maxFileLength; 798 } 799 800 @Override 801 public String toString() { 802 return directory.toString(); 803 } 804 805 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 806 Location cur = null; 807 while (true) { 808 if (cur == null) { 809 if (location == null) { 810 DataFile head = null; 811 synchronized (currentDataFile) { 812 head = dataFiles.getHead(); 813 } 814 if (head == null) { 815 return null; 816 } 817 cur = new Location(); 818 cur.setDataFileId(head.getDataFileId()); 819 cur.setOffset(0); 820 } else { 821 // Set to the next offset.. 822 if (location.getSize() == -1) { 823 cur = new Location(location); 824 } else { 825 cur = new Location(location); 826 cur.setOffset(location.getOffset() + location.getSize()); 827 } 828 } 829 } else { 830 cur.setOffset(cur.getOffset() + cur.getSize()); 831 } 832 833 DataFile dataFile = getDataFile(cur); 834 835 // Did it go into the next file?? 836 if (dataFile.getLength() <= cur.getOffset()) { 837 synchronized (currentDataFile) { 838 dataFile = dataFile.getNext(); 839 } 840 if (dataFile == null) { 841 return null; 842 } else { 843 cur.setDataFileId(dataFile.getDataFileId().intValue()); 844 cur.setOffset(0); 845 } 846 } 847 848 // Load in location size and type. 849 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 850 try { 851 reader.readLocationDetails(cur); 852 } finally { 853 accessorPool.closeDataFileAccessor(reader); 854 } 855 856 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 857 if (corruptedRange != null) { 858 // skip corruption 859 cur.setSize((int) corruptedRange.range()); 860 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 861 (cur.getType() == 0 && cur.getSize() == 0)) { 862 // eof - jump to next datafile 863 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 864 // replay of existing journals 865 // possibly journal is larger than maxFileLength after config change 866 cur.setSize(EOF_RECORD.length); 867 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 868 } else if (cur.getType() == USER_RECORD_TYPE) { 869 // Only return user records. 870 return cur; 871 } 872 } 873 } 874 875 public ByteSequence read(Location location) throws IOException, IllegalStateException { 876 DataFile dataFile = getDataFile(location); 877 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 878 ByteSequence rc = null; 879 try { 880 rc = reader.readRecord(location); 881 } finally { 882 accessorPool.closeDataFileAccessor(reader); 883 } 884 return rc; 885 } 886 887 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 888 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 889 return loc; 890 } 891 892 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 893 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 894 return loc; 895 } 896 897 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 898 DataFile dataFile = getDataFile(location); 899 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 900 try { 901 updater.updateRecord(location, data, sync); 902 } finally { 903 accessorPool.closeDataFileAccessor(updater); 904 } 905 } 906 907 public PreallocationStrategy getPreallocationStrategy() { 908 return preallocationStrategy; 909 } 910 911 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 912 this.preallocationStrategy = preallocationStrategy; 913 } 914 915 public PreallocationScope getPreallocationScope() { 916 return preallocationScope; 917 } 918 919 public void setPreallocationScope(PreallocationScope preallocationScope) { 920 this.preallocationScope = preallocationScope; 921 } 922 923 public File getDirectory() { 924 return directory; 925 } 926 927 public void setDirectory(File directory) { 928 this.directory = directory; 929 } 930 931 public String getFilePrefix() { 932 return filePrefix; 933 } 934 935 public void setFilePrefix(String filePrefix) { 936 this.filePrefix = filePrefix; 937 } 938 939 public Map<WriteKey, WriteCommand> getInflightWrites() { 940 return inflightWrites; 941 } 942 943 public Location getLastAppendLocation() { 944 return lastAppendLocation.get(); 945 } 946 947 public void setLastAppendLocation(Location lastSyncedLocation) { 948 this.lastAppendLocation.set(lastSyncedLocation); 949 } 950 951 public File getDirectoryArchive() { 952 if (!directoryArchiveOverridden && (directoryArchive == null)) { 953 // create the directoryArchive relative to the journal location 954 directoryArchive = new File(directory.getAbsolutePath() + 955 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 956 } 957 return directoryArchive; 958 } 959 960 public void setDirectoryArchive(File directoryArchive) { 961 directoryArchiveOverridden = true; 962 this.directoryArchive = directoryArchive; 963 } 964 965 public boolean isArchiveDataLogs() { 966 return archiveDataLogs; 967 } 968 969 public void setArchiveDataLogs(boolean archiveDataLogs) { 970 this.archiveDataLogs = archiveDataLogs; 971 } 972 973 public DataFile getDataFileById(int dataFileId) { 974 synchronized (currentDataFile) { 975 return fileMap.get(Integer.valueOf(dataFileId)); 976 } 977 } 978 979 public DataFile getCurrentDataFile(int capacity) throws IOException { 980 //First just acquire the currentDataFile lock and return if no rotation needed 981 synchronized (currentDataFile) { 982 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 983 return currentDataFile.get(); 984 } 985 } 986 987 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 988 //then re-check if rotation is needed 989 synchronized (dataFileIdLock) { 990 synchronized (currentDataFile) { 991 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 992 rotateWriteFile(); 993 } 994 return currentDataFile.get(); 995 } 996 } 997 } 998 999 public Integer getCurrentDataFileId() { 1000 synchronized (currentDataFile) { 1001 return currentDataFile.get().getDataFileId(); 1002 } 1003 } 1004 1005 /** 1006 * Get a set of files - only valid after start() 1007 * 1008 * @return files currently being used 1009 */ 1010 public Set<File> getFiles() { 1011 synchronized (currentDataFile) { 1012 return fileByFileMap.keySet(); 1013 } 1014 } 1015 1016 public Map<Integer, DataFile> getFileMap() { 1017 synchronized (currentDataFile) { 1018 return new TreeMap<Integer, DataFile>(fileMap); 1019 } 1020 } 1021 1022 public long getDiskSize() { 1023 return totalLength.get(); 1024 } 1025 1026 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1027 this.replicationTarget = replicationTarget; 1028 } 1029 1030 public ReplicationTarget getReplicationTarget() { 1031 return replicationTarget; 1032 } 1033 1034 public String getFileSuffix() { 1035 return fileSuffix; 1036 } 1037 1038 public void setFileSuffix(String fileSuffix) { 1039 this.fileSuffix = fileSuffix; 1040 } 1041 1042 public boolean isChecksum() { 1043 return checksum; 1044 } 1045 1046 public void setChecksum(boolean checksumWrites) { 1047 this.checksum = checksumWrites; 1048 } 1049 1050 public boolean isCheckForCorruptionOnStartup() { 1051 return checkForCorruptionOnStartup; 1052 } 1053 1054 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1055 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1056 } 1057 1058 public void setWriteBatchSize(int writeBatchSize) { 1059 this.writeBatchSize = writeBatchSize; 1060 } 1061 1062 public int getWriteBatchSize() { 1063 return writeBatchSize; 1064 } 1065 1066 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1067 this.totalLength = storeSizeAccumulator; 1068 } 1069 1070 public void setEnableAsyncDiskSync(boolean val) { 1071 this.enableAsyncDiskSync = val; 1072 } 1073 1074 public boolean isEnableAsyncDiskSync() { 1075 return enableAsyncDiskSync; 1076 } 1077 1078 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1079 return journalDiskSyncStrategy; 1080 } 1081 1082 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1083 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1084 } 1085 1086 public boolean isJournalDiskSyncPeriodic() { 1087 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1088 } 1089 1090 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1091 this.dataFileRemovedListener = dataFileRemovedListener; 1092 } 1093 1094 public static class WriteCommand extends LinkedNode<WriteCommand> { 1095 public final Location location; 1096 public final ByteSequence data; 1097 final boolean sync; 1098 public final Runnable onComplete; 1099 1100 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1101 this.location = location; 1102 this.data = data; 1103 this.sync = sync; 1104 this.onComplete = null; 1105 } 1106 1107 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1108 this.location = location; 1109 this.data = data; 1110 this.onComplete = onComplete; 1111 this.sync = false; 1112 } 1113 } 1114 1115 public static class WriteKey { 1116 private final int file; 1117 private final long offset; 1118 private final int hash; 1119 1120 public WriteKey(Location item) { 1121 file = item.getDataFileId(); 1122 offset = item.getOffset(); 1123 // TODO: see if we can build a better hash 1124 hash = (int)(file ^ offset); 1125 } 1126 1127 @Override 1128 public int hashCode() { 1129 return hash; 1130 } 1131 1132 @Override 1133 public boolean equals(Object obj) { 1134 if (obj instanceof WriteKey) { 1135 WriteKey di = (WriteKey)obj; 1136 return di.file == file && di.offset == offset; 1137 } 1138 return false; 1139 } 1140 } 1141}