import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import io.atomix.storage.journal.index.Position;
import io.netty.buffer.ByteBuf;
private final FileWriter fileWriter;
final @NonNull JournalSegment segment;
- private final @NonNull JournalIndex index;
+ private final @NonNull JournalIndex journalIndex;
final int maxSegmentSize;
final int maxEntrySize;
private int currentPosition;
JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex index) {
+ final JournalIndex journalIndex) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
- this.index = requireNonNull(index);
+ this.journalIndex = requireNonNull(journalIndex);
maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
// adjust lastEntry value
JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
segment = previous.segment;
- index = previous.index;
+ journalIndex = previous.journalIndex;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
currentPosition = previous.currentPosition;
this.fileWriter = requireNonNull(fileWriter);
}
- /**
- * Returns the last written index.
- *
- * @return The last written index.
- */
- long getLastIndex() {
- final var last = index.last();
- return last != null ? last.index() : segment.firstIndex() - 1;
- }
-
/**
* Returns the next index to be written.
*
* @return The next index to be written.
*/
- long getNextIndex() {
- final var last = index.last();
- return last != null ? last.index() + 1 : segment.firstIndex();
+ long nextIndex() {
+ final var lastPosition = journalIndex.last();
+ return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
}
/**
Position append(final ByteBuf buf) {
final var length = buf.readableBytes();
if (length > maxEntrySize) {
- throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
- + maxEntrySize + ")");
+ throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
}
// Store the entry index.
- final long index = getNextIndex();
+ final long index = nextIndex();
final int position = currentPosition;
// check space available
// Update the last entry with the correct index/term/length.
currentPosition = nextPosition;
- return this.index.index(index, position);
+ return journalIndex.index(index, position);
}
/**
break;
}
- this.index.index(nextIndex++, currentPosition);
+ journalIndex.index(nextIndex++, currentPosition);
// Update the current position for indexing.
currentPosition += HEADER_BYTES + buf.readableBytes();
*/
void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
- if (index >= getLastIndex()) {
+ if (index >= segment.lastIndex()) {
return;
}
// Truncate the index.
- this.index.truncate(index);
+ journalIndex.truncate(index);
if (index < segment.firstIndex()) {
// Reset the writer to the first entry.