import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
}
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
- private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
- .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
- .build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+ private static final FromByteBufMapper<Long> READ_MAPPER;
+ private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+ static {
+ final var namespace = JournalSerdes.builder()
+ .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+ .build();
+
+ READ_MAPPER = namespace.toReadMapper();
+ WRITE_MAPPER = namespace.toWriteMapper();
+ }
private final String persistenceId;
private final StorageLevel storage;
}
final var sw = Stopwatch.createStarted();
- deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
- .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+ deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+ .withDirectory(directory)
+ .withName("delete")
+ .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+ .build(), READ_MAPPER, WRITE_MAPPER);
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
.tryNext((index, value, length) -> value);
- lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);