*/
package io.atomix.storage.journal;
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class JournalSegment {
- private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
-
- private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
- private final AtomicInteger references = new AtomicInteger();
- private final JournalSegmentFile file;
- private final StorageLevel storageLevel;
- private final int maxEntrySize;
- private final JournalIndex journalIndex;
-
- private JournalSegmentWriter writer;
- private boolean open = true;
-
- JournalSegment(
- final JournalSegmentFile file,
- final StorageLevel storageLevel,
- final int maxEntrySize,
- final double indexDensity) {
- this.file = requireNonNull(file);
- this.storageLevel = requireNonNull(storageLevel);
- this.maxEntrySize = maxEntrySize;
- journalIndex = new SparseJournalIndex(indexDensity);
-
- final var fileWriter = switch (storageLevel) {
- case DISK -> new DiskFileWriter(file, maxEntrySize);
- case MAPPED -> new MappedFileWriter(file, maxEntrySize);
- };
- writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
- // relinquish mapped memory
- .toFileChannel();
- }
-
- /**
- * Returns the segment's starting index.
- *
- * @return The segment's starting index.
- */
- long firstIndex() {
- return file.firstIndex();
- }
-
- /**
- * Returns the last index in the segment.
- *
- * @return The last index in the segment.
- */
- long lastIndex() {
- final var lastPosition = journalIndex.last();
- return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
- }
-
- /**
- * Returns the segment file.
- *
- * @return The segment file.
- */
- JournalSegmentFile file() {
- return file;
- }
-
- /**
- * Looks up the position of the given index.
- *
- * @param index the index to lookup
- * @return the position of the given index or a lesser index, or {@code null}
- */
- @Nullable Position lookup(final long index) {
- return journalIndex.lookup(index);
- }
-
- /**
- * Acquires a reference to the log segment.
- */
- private void acquire() {
- if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
- writer = writer.toMapped();
- }
- }
-
- /**
- * Releases a reference to the log segment.
- */
- private void release() {
- if (references.decrementAndGet() == 0) {
- if (storageLevel == StorageLevel.MAPPED) {
- writer = writer.toFileChannel();
- }
- if (!open) {
- finishClose();
- }
- }
- }
-
- /**
- * Acquires a reference to the segment writer.
- *
- * @return The segment writer.
- */
- JournalSegmentWriter acquireWriter() {
- checkOpen();
- acquire();
-
- return writer;
- }
-
- /**
- * Releases the reference to the segment writer.
- */
- void releaseWriter() {
- release();
- }
-
- /**
- * Creates a new segment reader.
- *
- * @return A new segment reader.
- */
- JournalSegmentReader createReader() {
- checkOpen();
- acquire();
-
- final var buffer = writer.buffer();
- final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
- final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
- reader.setPosition(JournalSegmentDescriptor.BYTES);
- readers.add(reader);
- return reader;
- }
-
- /**
- * Closes a segment reader.
- *
- * @param reader the closed segment reader
- */
- void closeReader(final JournalSegmentReader reader) {
- if (readers.remove(reader)) {
- release();
- }
- }
-
- /**
- * Checks whether the segment is open.
- */
- private void checkOpen() {
- if (!open) {
- throw new IllegalStateException("Segment not open");
- }
- }
-
- /**
- * Returns a boolean indicating whether the segment is open.
- *
- * @return indicates whether the segment is open
- */
- boolean isOpen() {
- return open;
- }
-
- /**
- * Closes the segment.
- */
- void close() {
- if (!open) {
- return;
- }
-
- LOG.debug("Closing segment: {}", this);
- open = false;
- readers.forEach(JournalSegmentReader::close);
- if (references.get() == 0) {
- finishClose();
- }
- }
-
- private void finishClose() {
- writer.close();
- try {
- file.close();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
- /**
- * Deletes the segment.
- */
- void delete() {
- close();
- LOG.debug("Deleting segment: {}", this);
- try {
- Files.deleteIfExists(file.path());
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("id", file.segmentId())
- .add("version", file.version())
- .add("index", file.firstIndex())
- .toString();
- }
+ private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
+
+ private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
+ private final AtomicInteger references = new AtomicInteger();
+ private final JournalSegmentFile file;
+ private final StorageLevel storageLevel;
+ private final int maxEntrySize;
+ private final JournalIndex journalIndex;
+
+ private JournalSegmentWriter writer;
+ private boolean open = true;
+
+ JournalSegment(
+ final JournalSegmentFile file,
+ final StorageLevel storageLevel,
+ final int maxEntrySize,
+ final double indexDensity) {
+ this.file = requireNonNull(file);
+ this.storageLevel = requireNonNull(storageLevel);
+ this.maxEntrySize = maxEntrySize;
+ journalIndex = new SparseJournalIndex(indexDensity);
+
+ final var fileWriter = switch (storageLevel) {
+ case DISK -> new DiskFileWriter(file, maxEntrySize);
+ case MAPPED -> new MappedFileWriter(file, maxEntrySize);
+ };
+
+ // Traverse all entries and push them to index -- thus reconstructing both last index and current position
+ writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
+ indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
+ // relinquish mapped memory
+ .toFileChannel();
+ }
+
+ /**
+ * Returns the segment's starting index.
+ *
+ * @return The segment's starting index.
+ */
+ long firstIndex() {
+ return file.firstIndex();
+ }
+
+ /**
+ * Returns the last index in the segment.
+ *
+ * @return The last index in the segment.
+ */
+ long lastIndex() {
+ final var lastPosition = journalIndex.last();
+ return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
+ }
+
+ /**
+ * Returns the segment file.
+ *
+ * @return The segment file.
+ */
+ JournalSegmentFile file() {
+ return file;
+ }
+
+ /**
+ * Looks up the position of the given index.
+ *
+ * @param index the index to lookup
+ * @return the position of the given index or a lesser index, or {@code null}
+ */
+ @Nullable Position lookup(final long index) {
+ return journalIndex.lookup(index);
+ }
+
+ /**
+ * Acquires a reference to the log segment.
+ */
+ private void acquire() {
+ if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
+ writer = writer.toMapped();
+ }
+ }
+
+ /**
+ * Releases a reference to the log segment.
+ */
+ private void release() {
+ if (references.decrementAndGet() == 0) {
+ if (storageLevel == StorageLevel.MAPPED) {
+ writer = writer.toFileChannel();
+ }
+ if (!open) {
+ finishClose();
+ }
+ }
+ }
+
+ /**
+ * Acquires a reference to the segment writer.
+ *
+ * @return The segment writer.
+ */
+ JournalSegmentWriter acquireWriter() {
+ checkOpen();
+ acquire();
+
+ return writer;
+ }
+
+ /**
+ * Releases the reference to the segment writer.
+ */
+ void releaseWriter() {
+ release();
+ }
+
+ /**
+ * Creates a new segment reader.
+ *
+ * @return A new segment reader.
+ */
+ JournalSegmentReader createReader() {
+ checkOpen();
+ acquire();
+
+ final var buffer = writer.buffer();
+ final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
+ : new DiskFileReader(file, maxEntrySize);
+ final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+ reader.setPosition(JournalSegmentDescriptor.BYTES);
+ readers.add(reader);
+ return reader;
+ }
+
+ /**
+ * Closes a segment reader.
+ *
+ * @param reader the closed segment reader
+ */
+ void closeReader(final JournalSegmentReader reader) {
+ if (readers.remove(reader)) {
+ release();
+ }
+ }
+
+ /**
+ * Checks whether the segment is open.
+ */
+ private void checkOpen() {
+ if (!open) {
+ throw new IllegalStateException("Segment not open");
+ }
+ }
+
+ /**
+ * Returns a boolean indicating whether the segment is open.
+ *
+ * @return indicates whether the segment is open
+ */
+ boolean isOpen() {
+ return open;
+ }
+
+ /**
+ * Closes the segment.
+ */
+ void close() {
+ if (!open) {
+ return;
+ }
+
+ LOG.debug("Closing segment: {}", this);
+ open = false;
+ readers.forEach(JournalSegmentReader::close);
+ if (references.get() == 0) {
+ finishClose();
+ }
+ }
+
+ private void finishClose() {
+ writer.close();
+ try {
+ file.close();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ /**
+ * Deletes the segment.
+ */
+ void delete() {
+ close();
+ LOG.debug("Deleting segment: {}", this);
+ try {
+ Files.deleteIfExists(file.path());
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", file.segmentId())
+ .add("version", file.version())
+ .add("index", file.firstIndex())
+ .toString();
+ }
+
+ static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+ // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+ final var fileReader = fileWriter.reader();
+ try {
+ return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+ } finally {
+ // Make sure reader does not see anything we've done
+ fileReader.invalidateCache();
+ }
+ }
+
+ private static int indexEntries(final FileReader fileReader, final JournalSegment segment, final int maxEntrySize,
+ final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+ int position;
+ long nextIndex;
+ if (start != null) {
+ // look from nearest recovered index
+ nextIndex = start.index();
+ position = start.position();
+ } else {
+ // look from very beginning of the segment
+ nextIndex = segment.firstIndex();
+ position = JournalSegmentDescriptor.BYTES;
+ }
+
+ final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
+ reader.setPosition(position);
+
+ while (nextIndex <= maxNextIndex) {
+ final var buf = reader.readBytes();
+ if (buf == null) {
+ break;
+ }
+
+ journalIndex.index(nextIndex++, position);
+ // Update the current position for indexing.
+ position += HEADER_BYTES + buf.readableBytes();
+ }
+
+ return position;
+ }
}
private int currentPosition;
JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex) {
+ final JournalIndex journalIndex, final int currentPosition) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.journalIndex = requireNonNull(journalIndex);
- maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
-
- // recover currentPosition and lastIndex
- reset(Long.MAX_VALUE, null);
+ this.currentPosition = currentPosition;
+ maxSegmentSize = segment.file().maxSize();
}
JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
// Truncate the index, find nearest indexed entry
final var nearest = journalIndex.truncate(index);
- // recover position and last written
- if (index >= segment.firstIndex()) {
- reset(index, nearest);
- } else {
- currentPosition = JournalSegmentDescriptor.BYTES;
- }
+ currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
+ // recover position and last written
+ : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
// Zero the entry header at current channel position.
fileWriter.writeEmptyHeader(currentPosition);
}
- private void reset(final long maxNextIndex, final @Nullable Position position) {
- // acquire ownership of cache and make sure reader does not see anything we've done once we're done
- final var fileReader = fileWriter.reader();
- try {
- reset(fileReader, maxNextIndex, position);
- } finally {
- // Make sure reader does not see anything we've done
- fileReader.invalidateCache();
- }
- }
-
- private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
- long nextIndex;
- if (position != null) {
- // look from nearest recovered index
- nextIndex = position.index();
- currentPosition = position.position();
- } else {
- // look from very beginning of the segment
- nextIndex = segment.firstIndex();
- currentPosition = JournalSegmentDescriptor.BYTES;
- }
-
- final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
- reader.setPosition(currentPosition);
-
- while (nextIndex <= maxNextIndex) {
- final var buf = reader.readBytes();
- if (buf == null) {
- break;
- }
-
- journalIndex.index(nextIndex++, currentPosition);
- // Update the current position for indexing.
- currentPosition += HEADER_BYTES + buf.readableBytes();
- }
- }
-
/**
* Flushes written entries to disk.
*/