Bump versions 9.0.4-SNAPSHOT
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
index 73ffab6a053639f89b45470efab8685408efb9a3..0b7808391162327188cc871c69caf75422b88c4c 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -36,6 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -282,10 +285,18 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
-    private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
-        .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
-        .build();
     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+    private static final FromByteBufMapper<Long> READ_MAPPER;
+    private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+    static {
+        final var namespace = JournalSerdes.builder()
+            .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+            .build();
+
+        READ_MAPPER = namespace.toReadMapper();
+        WRITE_MAPPER = namespace.toWriteMapper();
+    }
 
     private final String persistenceId;
     private final StorageLevel storage;
@@ -494,11 +505,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         final var sw = Stopwatch.createStarted();
-        deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
-                .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
-        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+        deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+            .withDirectory(directory)
+            .withName("delete")
+            .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+            .build(), READ_MAPPER, WRITE_MAPPER);
+        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
             .tryNext((index, value, length) -> value);
-        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
+        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);