*/
@NonNullByDefault
public interface ByteBufJournal extends AutoCloseable {
+ /**
+ * Return the index of the last entry in the journal.
+ *
+ * @return the last index, or zero if there are no entries.
+ */
+ long lastIndex();
+
/**
* Returns the journal writer.
*
*/
@NonNullByDefault
public interface ByteBufWriter {
- /**
- * Returns the last written index.
- *
- * @return The last written index
- */
- long lastIndex();
-
/**
* Returns the next index to be written.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface Journal<E> extends AutoCloseable {
+ /**
+ * Return the index of the last entry in the journal.
+ *
+ * @return the last index, or zero if there are no entries.
+ */
+ long lastIndex();
+
/**
* Returns the journal writer.
*
final class JournalSegment {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
+ private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
+ private final AtomicInteger references = new AtomicInteger();
private final JournalSegmentFile file;
private final StorageLevel storageLevel;
private final int maxEntrySize;
private final JournalIndex journalIndex;
- private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
- private final AtomicInteger references = new AtomicInteger();
private JournalSegmentWriter writer;
private boolean open = true;
* @return The last index in the segment.
*/
long lastIndex() {
- return writer.getLastIndex();
+ final var lastPosition = journalIndex.last();
+ return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
}
/**
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import io.atomix.storage.journal.index.Position;
import io.netty.buffer.ByteBuf;
private final FileWriter fileWriter;
final @NonNull JournalSegment segment;
- private final @NonNull JournalIndex index;
+ private final @NonNull JournalIndex journalIndex;
final int maxSegmentSize;
final int maxEntrySize;
private int currentPosition;
JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex index) {
+ final JournalIndex journalIndex) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
- this.index = requireNonNull(index);
+ this.journalIndex = requireNonNull(journalIndex);
maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
// adjust lastEntry value
JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
segment = previous.segment;
- index = previous.index;
+ journalIndex = previous.journalIndex;
maxSegmentSize = previous.maxSegmentSize;
maxEntrySize = previous.maxEntrySize;
currentPosition = previous.currentPosition;
this.fileWriter = requireNonNull(fileWriter);
}
- /**
- * Returns the last written index.
- *
- * @return The last written index.
- */
- long getLastIndex() {
- final var last = index.last();
- return last != null ? last.index() : segment.firstIndex() - 1;
- }
-
/**
* Returns the next index to be written.
*
* @return The next index to be written.
*/
- long getNextIndex() {
- final var last = index.last();
- return last != null ? last.index() + 1 : segment.firstIndex();
+ long nextIndex() {
+ final var lastPosition = journalIndex.last();
+ return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
}
/**
Position append(final ByteBuf buf) {
final var length = buf.readableBytes();
if (length > maxEntrySize) {
- throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
- + maxEntrySize + ")");
+ throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
}
// Store the entry index.
- final long index = getNextIndex();
+ final long index = nextIndex();
final int position = currentPosition;
// check space available
// Update the last entry with the correct index/term/length.
currentPosition = nextPosition;
- return this.index.index(index, position);
+ return journalIndex.index(index, position);
}
/**
break;
}
- this.index.index(nextIndex++, currentPosition);
+ journalIndex.index(nextIndex++, currentPosition);
// Update the current position for indexing.
currentPosition += HEADER_BYTES + buf.readableBytes();
*/
void truncate(final long index) {
// If the index is greater than or equal to the last index, skip the truncate.
- if (index >= getLastIndex()) {
+ if (index >= segment.lastIndex()) {
return;
}
// Truncate the index.
- this.index.truncate(index);
+ journalIndex.truncate(index);
if (index < segment.firstIndex()) {
// Reset the writer to the first entry.
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface JournalWriter<E> {
- /**
- * Returns the last written index.
- *
- * @return The last written index.
- */
- long getLastIndex();
-
/**
* Returns the next index to be written.
*
.sum();
}
+ @Override
+ public long lastIndex() {
+ return lastSegment().lastIndex();
+ }
+
@Override
public ByteBufWriter writer() {
return writer;
currentWriter = currentSegment.acquireWriter();
}
- @Override
- public long lastIndex() {
- return currentWriter.getLastIndex();
- }
-
@Override
public long nextIndex() {
- return currentWriter.getNextIndex();
+ return currentWriter.nextIndex();
}
@Override
writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
}
+ @Override
+ public long lastIndex() {
+ return journal.lastIndex();
+ }
+
@Override
public JournalWriter<E> writer() {
return writer;
this.mapper = requireNonNull(mapper);
}
- @Override
- public long getLastIndex() {
- return writer.lastIndex();
- }
-
@Override
public long getNextIndex() {
return writer.nextIndex();
JournalWriter<TestEntry> writer = journal.writer();
JournalReader<TestEntry> reader = journal.openReader(1);
- assertEquals(0, writer.getLastIndex());
+ assertEquals(0, journal.lastIndex());
+ assertEquals(1, writer.getNextIndex());
writer.append(ENTRY);
writer.append(ENTRY);
writer.reset(1);
- assertEquals(0, writer.getLastIndex());
+ assertEquals(0, journal.lastIndex());
+ assertEquals(1, writer.getNextIndex());
writer.append(ENTRY);
var indexed = assertNext(reader);
assertEquals(1, indexed.index());
writer.reset(1);
- assertEquals(0, writer.getLastIndex());
+ assertEquals(0, journal.lastIndex());
+ assertEquals(1, writer.getNextIndex());
indexed = writer.append(ENTRY);
- assertEquals(1, writer.getLastIndex());
+ assertEquals(1, journal.lastIndex());
+ assertEquals(2, writer.getNextIndex());
assertEquals(1, indexed.index());
indexed = assertNext(reader);
assertEquals(1, indexed.index());
writer.truncate(0);
- assertEquals(0, writer.getLastIndex());
+ assertEquals(0, journal.lastIndex());
+ assertEquals(1, writer.getNextIndex());
indexed = writer.append(ENTRY);
- assertEquals(1, writer.getLastIndex());
+ assertEquals(1, journal.lastIndex());
+ assertEquals(2, writer.getNextIndex());
assertEquals(1, indexed.index());
indexed = assertNext(reader);
@Override
long lastWrittenSequenceNr() {
- return entries.writer().getLastIndex();
+ return entries.lastIndex();
}
@Override
long writtenBytes = 0;
for (int i = 0; i < count; ++i) {
- final long mark = writer.getLastIndex();
+ final long prevNextIndex = writer.getNextIndex();
final var request = message.getRequest(i);
final var reprs = CollectionConverters.asJava(request.payload());
- LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
+ LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), prevNextIndex);
try {
writtenBytes += writePayload(writer, reprs);
} catch (Exception e) {
- LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
+ LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count,
+ prevNextIndex, e);
responses.add(e);
- writer.truncate(mark);
+ writer.reset(prevNextIndex);
continue;
}
responses.add(null);
}
final var sw = Stopwatch.createStarted();
- deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
- .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+ deleteJournal = SegmentedJournal.<Long>builder()
+ .withDirectory(directory)
+ .withName("delete")
+ .withNamespace(DELETE_NAMESPACE)
+ .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+ .build();
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
.tryNext((index, value, length) -> value);
lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();