final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- final long firstIndex;
// FIXME: hide these two fields
Indexed<E> lastEntry;
this.namespace = requireNonNull(namespace);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
- firstIndex = segment.index();
}
JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
namespace = previous.namespace;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
- firstIndex = previous.firstIndex;
lastEntry = previous.lastEntry;
currentPosition = previous.currentPosition;
}
* @return The last written index.
*/
final long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : firstIndex - 1;
+ return lastEntry != null ? lastEntry.index() : segment.index() - 1;
}
/**
* @return The next index to be written.
*/
final long getNextIndex() {
- return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
+ return lastEntry != null ? lastEntry.index() + 1 : segment.index();
}
/**
abstract JournalSegmentReader<E> reader();
private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
- long nextIndex = firstIndex;
+ long nextIndex = segment.index();
// Clear the buffer indexes and acquire ownership of the buffer
currentPosition = JournalSegmentDescriptor.BYTES;
*
* @param index The index to which to truncate the log.
*/
- abstract void truncate(long index);
+ final void truncate(final long index) {
+ // If the index is greater than or equal to the last index, skip the truncate.
+ if (index >= getLastIndex()) {
+ return;
+ }
+
+ // Reset the last entry.
+ lastEntry = null;
+
+ // Truncate the index.
+ this.index.truncate(index);
+
+ if (index < segment.index()) {
+ // Reset the writer to the first entry.
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ } else {
+ // Reset the writer to the given index.
+ reset(index);
+ }
+
+ // Zero the entry header at current channel position.
+ writeEmptyHeader(currentPosition);
+ }
+
+ /**
+ * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
+ *
+ * @param position position to write to
+ */
+ abstract void writeEmptyHeader(int position);
/**
* Flushes written entries to disk.