private final JournalSegmentReader<E> reader;
private final ByteBuffer buffer;
- private Indexed<E> lastEntry;
- private int currentPosition;
-
DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
final JournalIndex index, final JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
reset(0);
}
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
super(previous);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
reader = new JournalSegmentReader<>(segment,
new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- lastEntry = previous.getLastEntry();
- currentPosition = position;
}
@Override
@Override
MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this, currentPosition);
+ return new MappedJournalSegmentWriter<>(this);
}
@Override
}
@Override
- void reset(final long index) {
- // acquire ownership of cache and make sure reader does not see anything we've done once we're done
- reader.invalidateCache();
- try {
- resetWithBuffer(index);
- } finally {
- // Make sure reader does not see anything we've done
- reader.invalidateCache();
- }
- }
-
- private void resetWithBuffer(final long index) {
- long nextIndex = firstIndex;
-
- // Clear the buffer indexes and acquire ownership of the buffer
- currentPosition = JournalSegmentDescriptor.BYTES;
- reader.setPosition(JournalSegmentDescriptor.BYTES);
-
- while (index == 0 || nextIndex <= index) {
- final var entry = reader.readEntry(nextIndex);
- if (entry == null) {
- break;
- }
-
- lastEntry = entry;
- this.index.index(nextIndex, currentPosition);
- nextIndex++;
-
- // Update the current position for indexing.
- currentPosition = currentPosition + HEADER_BYTES + entry.size();
- }
- }
-
- @Override
- Indexed<E> getLastEntry() {
- return lastEntry;
+ JournalSegmentReader<E> reader() {
+ return reader;
}
@Override
*/
package io.atomix.storage.journal;
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
import io.atomix.storage.journal.index.JournalIndex;
final int maxEntrySize;
final long firstIndex;
+ // FIXME: hide these two fields
+ Indexed<E> lastEntry;
+ int currentPosition;
+
JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
final JournalIndex index, final JournalSerdes namespace) {
this.channel = requireNonNull(channel);
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
firstIndex = previous.firstIndex;
+ lastEntry = previous.lastEntry;
+ currentPosition = previous.currentPosition;
}
/**
* @return The last written index.
*/
final long getLastIndex() {
- final Indexed<?> lastEntry;
- return (lastEntry = getLastEntry()) != null ? lastEntry.index() : firstIndex - 1;
+ return lastEntry != null ? lastEntry.index() : firstIndex - 1;
}
/**
*
* @return The last entry written.
*/
- abstract Indexed<E> getLastEntry();
+ final Indexed<E> getLastEntry() {
+ return lastEntry;
+ }
/**
* Returns the next index to be written.
* @return The next index to be written.
*/
final long getNextIndex() {
- final Indexed<?> lastEntry;
- return (lastEntry = getLastEntry()) != null ? lastEntry.index() + 1 : firstIndex;
+ return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
}
/**
*
* @param index the index to which to reset the head of the segment
*/
- abstract void reset(long index);
+ final void reset(final long index) {
+ // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+ final var reader = reader();
+ reader.invalidateCache();
+ try {
+ resetWithBuffer(reader, index);
+ } finally {
+ // Make sure reader does not see anything we've done
+ reader.invalidateCache();
+ }
+ }
+
+ abstract JournalSegmentReader<E> reader();
+
+ private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+ long nextIndex = firstIndex;
+
+ // Clear the buffer indexes and acquire ownership of the buffer
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ reader.setPosition(JournalSegmentDescriptor.BYTES);
+
+ while (index == 0 || nextIndex <= index) {
+ final var entry = reader.readEntry(nextIndex);
+ if (entry == null) {
+ break;
+ }
+
+ lastEntry = entry;
+ this.index.index(nextIndex, currentPosition);
+ nextIndex++;
+
+ // Update the current position for indexing.
+ currentPosition = currentPosition + HEADER_BYTES + entry.size();
+ }
+ }
/**
* Truncates the log to the given index.
*/
final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
private final @NonNull MappedByteBuffer mappedBuffer;
+ private final JournalSegmentReader<E> reader;
private final ByteBuffer buffer;
- private Indexed<E> lastEntry;
- private int currentPosition;
-
- MappedJournalSegmentWriter(
- final FileChannel channel,
- final JournalSegment<E> segment,
- final int maxEntrySize,
- final JournalIndex index,
- final JournalSerdes namespace) {
+ MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
+
mappedBuffer = mapBuffer(channel, maxSegmentSize);
buffer = mappedBuffer.slice();
+ reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize, namespace);
reset(0);
}
- MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
+ MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
super(previous);
+
mappedBuffer = mapBuffer(channel, maxSegmentSize);
buffer = mappedBuffer.slice();
- currentPosition = position;
- lastEntry = previous.getLastEntry();
+ reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+ maxEntrySize, namespace);
}
private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
@Override
DiskJournalSegmentWriter<E> toFileChannel() {
close();
- return new DiskJournalSegmentWriter<>(this, currentPosition);
- }
-
- @Override
- void reset(final long index) {
- long nextIndex = firstIndex;
-
- // Clear the buffer indexes.
- currentPosition = JournalSegmentDescriptor.BYTES;
-
- int length = buffer.getInt(currentPosition);
-
- // If the length is non-zero, read the entry.
- while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
-
- // Read the checksum of the entry.
- final long checksum = buffer.getInt(currentPosition + Integer.BYTES);
-
- // Slice off the entry's bytes
- final var entryBytes = buffer.slice(currentPosition + SegmentEntry.HEADER_BYTES, length);
-
- // Compute the checksum for the entry bytes.
- final var crc32 = new CRC32();
- crc32.update(entryBytes);
-
- // If the stored checksum does not equal the computed checksum, do not proceed further
- if (checksum != (int) crc32.getValue()) {
- break;
- }
-
- entryBytes.rewind();
- final E entry = namespace.deserialize(entryBytes);
- lastEntry = new Indexed<>(nextIndex, entry, length);
- this.index.index(nextIndex, currentPosition);
- nextIndex++;
-
- // Update the current position for indexing.
- currentPosition = currentPosition + SegmentEntry.HEADER_BYTES + length;
-
- if (currentPosition + SegmentEntry.HEADER_BYTES >= maxSegmentSize) {
- break;
- }
- length = buffer.getInt(currentPosition);
- }
+ return new DiskJournalSegmentWriter<>(this);
}
@Override
- Indexed<E> getLastEntry() {
- return lastEntry;
+ JournalSegmentReader<E> reader() {
+ return reader;
}
@Override