private int currentPosition;
private Long lastIndex;
- private ByteBuf lastWritten;
JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
final JournalIndex index) {
index = previous.index;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
- lastWritten = previous.lastWritten;
lastIndex = previous.lastIndex;
currentPosition = previous.currentPosition;
this.fileWriter = requireNonNull(fileWriter);
*
* @return The last written index.
*/
- final long getLastIndex() {
+ long getLastIndex() {
return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
}
- /**
- * Returns the last data written.
- *
- * @return The last data written.
- */
- final ByteBuf getLastWritten() {
- return lastWritten == null ? null : lastWritten.slice();
- }
-
/**
* Returns the next index to be written.
*
* @return The next index to be written.
*/
- final long getNextIndex() {
+ long getNextIndex() {
return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
}
* @param buf binary data to append
* @return The index of appended data, or {@code null} if segment has no space
*/
- final Long append(final ByteBuf buf) {
+ Long append(final ByteBuf buf) {
final var length = buf.readableBytes();
if (length > maxEntrySize) {
throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
// Update the last entry with the correct index/term/length.
currentPosition = nextPosition;
- lastWritten = buf;
lastIndex = index;
this.index.index(index, position);
*
* @param index the index to which to reset the head of the segment
*/
- final void reset(final long index) {
+ void reset(final long index) {
// acquire ownership of cache and make sure reader does not see anything we've done once we're done
final var fileReader = fileWriter.reader();
try {
break;
}
- lastWritten = buf;
lastIndex = nextIndex;
this.index.index(nextIndex, currentPosition);
nextIndex++;
*
* @param index The index to which to truncate the log.
*/
- final void truncate(final long index) {
+ void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
if (index >= getLastIndex()) {
return;
// Reset the last written
lastIndex = null;
- lastWritten = null;
// Truncate the index.
this.index.truncate(index);
assertEquals(1, indexed.index());
writer.reset(1);
assertEquals(0, writer.getLastIndex());
- writer.append(ENTRY);
+ indexed = writer.append(ENTRY);
assertEquals(1, writer.getLastIndex());
- assertEquals(1, writer.getLastEntry().index());
+ assertEquals(1, indexed.index());
indexed = assertNext(reader);
assertEquals(1, indexed.index());
writer.truncate(0);
assertEquals(0, writer.getLastIndex());
- assertNull(writer.getLastEntry());
- writer.append(ENTRY);
+ indexed = writer.append(ENTRY);
assertEquals(1, writer.getLastIndex());
- assertEquals(1, writer.getLastEntry().index());
+ assertEquals(1, indexed.index());
indexed = assertNext(reader);
assertEquals(1, indexed.index());
final var sw = Stopwatch.createStarted();
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final var lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.entry();
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+ .tryNext((index, value, length) -> value);
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);