* Segmented journal.
*/
public final class SegmentedJournal<E> implements Journal<E> {
-
/**
* Returns a new Raft log builder.
*
this.indexDensity = indexDensity;
this.flushOnCommit = flushOnCommit;
open();
- this.writer = openWriter();
+ this.writer = new SegmentedJournalWriter<>(this);
}
/**
}
@Override
- public SegmentedJournalWriter<E> writer() {
+ public JournalWriter<E> writer() {
return writer;
}
@Override
- public SegmentedJournalReader<E> openReader(long index) {
- return openReader(index, SegmentedJournalReader.Mode.ALL);
+ public JournalReader<E> openReader(long index) {
+ return openReader(index, JournalReader.Mode.ALL);
}
/**
* @param mode The mode in which to read entries.
* @return The Raft log reader.
*/
- public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
- SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
+ public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
+ final var reader = new SegmentedJournalReader<>(this, index, mode);
readers.add(reader);
return reader;
}
- /**
- * Opens a new journal writer.
- *
- * @return A new journal writer.
- */
- protected SegmentedJournalWriter<E> openWriter() {
- return new SegmentedJournalWriter<>(this);
- }
-
/**
* Opens the segments.
*/
/**
* Raft log reader.
*/
-public final class SegmentedJournalReader<E> implements JournalReader<E> {
+final class SegmentedJournalReader<E> implements JournalReader<E> {
private final SegmentedJournal<E> journal;
private JournalSegment<E> currentSegment;
private Indexed<E> previousEntry;
/**
* Raft log writer.
*/
-public final class SegmentedJournalWriter<E> implements JournalWriter<E> {
+final class SegmentedJournalWriter<E> implements JournalWriter<E> {
private final SegmentedJournal<E> journal;
private JournalSegment<E> currentSegment;
private JournalSegmentWriter<E> currentWriter;
import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
import com.google.common.base.VerifyException;
+import io.atomix.storage.journal.JournalReader;
import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.JournalWriter;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
-import io.atomix.storage.journal.SegmentedJournalWriter;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.io.Serializable;
}
}
- private void handleReplayMessages(final SegmentedJournalReader<DataJournalEntry> reader,
- final ReplayMessages message) {
+ private void handleReplayMessages(final JournalReader<DataJournalEntry> reader, final ReplayMessages message) {
int count = 0;
while (reader.hasNext() && count < message.max) {
final var next = reader.next();
return bytes;
}
- private long writePayload(final SegmentedJournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
+ private long writePayload(final JournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
long bytes = 0;
for (var repr : reprs) {
final Object payload = repr.payload();