}
@Override
- void truncate(final long index) {
- // If the index is greater than or equal to the last index, skip the truncate.
- if (index >= getLastIndex()) {
- return;
- }
-
- // Reset the last entry.
- lastEntry = null;
-
- // Truncate the index.
- this.index.truncate(index);
-
+ void writeEmptyHeader(final int position) {
try {
- if (index < firstIndex) {
- // Reset the writer to the first entry.
- currentPosition = JournalSegmentDescriptor.BYTES;
- } else {
- // Reset the writer to the given index.
- reset(index);
- }
-
- // Zero the entry header at current channel position.
- channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition);
+ channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
} catch (IOException e) {
throw new StorageException(e);
}
final @NonNull JournalSerdes namespace;
final int maxSegmentSize;
final int maxEntrySize;
- final long firstIndex;
// FIXME: hide these two fields
Indexed<E> lastEntry;
this.namespace = requireNonNull(namespace);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
- firstIndex = segment.index();
}
JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
namespace = previous.namespace;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
- firstIndex = previous.firstIndex;
lastEntry = previous.lastEntry;
currentPosition = previous.currentPosition;
}
* @return The last written index.
*/
final long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : firstIndex - 1;
+ return lastEntry != null ? lastEntry.index() : segment.index() - 1;
}
/**
* @return The next index to be written.
*/
final long getNextIndex() {
- return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
+ return lastEntry != null ? lastEntry.index() + 1 : segment.index();
}
/**
abstract JournalSegmentReader<E> reader();
private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
- long nextIndex = firstIndex;
+ long nextIndex = segment.index();
// Clear the buffer indexes and acquire ownership of the buffer
currentPosition = JournalSegmentDescriptor.BYTES;
*
* @param index The index to which to truncate the log.
*/
- abstract void truncate(long index);
+ final void truncate(final long index) {
+ // If the index is greater than or equal to the last index, skip the truncate.
+ if (index >= getLastIndex()) {
+ return;
+ }
+
+ // Reset the last entry.
+ lastEntry = null;
+
+ // Truncate the index.
+ this.index.truncate(index);
+
+ if (index < segment.index()) {
+ // Reset the writer to the first entry.
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ } else {
+ // Reset the writer to the given index.
+ reset(index);
+ }
+
+ // Zero the entry header at current channel position.
+ writeEmptyHeader(currentPosition);
+ }
+
+ /**
+ * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
+ *
+ * @param position position to write to
+ */
+ abstract void writeEmptyHeader(int position);
/**
* Flushes written entries to disk.
}
@Override
- void truncate(final long index) {
- // If the index is greater than or equal to the last index, skip the truncate.
- if (index >= getLastIndex()) {
- return;
- }
-
- // Reset the last entry.
- lastEntry = null;
-
- // Truncate the index.
- this.index.truncate(index);
-
- if (index < firstIndex) {
- // Reset the writer to the first entry.
- currentPosition = JournalSegmentDescriptor.BYTES;
- } else {
- // Reset the writer to the given index.
- reset(index);
- }
-
- // Zero the entry header at current buffer position.
- // Note: we issue a single putLong() instead of two putInt()s.
- buffer.putLong(currentPosition, 0L);
+ void writeEmptyHeader(final int position) {
+ // Note: we issue a single putLong() instead of two putInt()s.
+ buffer.putLong(position, 0L);
}
@Override