Modernize sal-akka-segmented-journal 85/109485/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 29 Dec 2023 11:58:53 +0000 (12:58 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 31 Dec 2023 13:56:28 +0000 (14:56 +0100)
Use local variable type inference, instanceof patterns and seal
DataJournalEntry.

JIRA: CONTROLLER-2089
Change-Id: I3c2ee2de5eaae6874e2f75a44dfd6fce0942e8ae
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index 6899c6e1d652d518dd49ec15e1692ac409b51661..e0321b35d621db364ad7a5f932be0939d72d52c2 100644 (file)
@@ -16,10 +16,8 @@ import io.atomix.storage.journal.JournalSegment;
  * A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a
  * journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s
  * index.
- *
- * @author Robert Varga
  */
-abstract class DataJournalEntry {
+abstract sealed class DataJournalEntry {
     /**
      * A single data journal entry on its way to the backing file.
      */
index f2cfbb4f199af38ca7d10eabcfe1557e51d5b3c5..9286749ca50c52ad41a5b6100aa08af985ff287b 100644 (file)
@@ -7,12 +7,13 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
-import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSystem;
 import akka.actor.ExtendedActorSystem;
 import akka.persistence.PersistentRepr;
+import akka.serialization.JavaSerializer;
+import com.google.common.base.VerifyException;
 import io.atomix.storage.journal.JournalSerdes.EntryInput;
 import io.atomix.storage.journal.JournalSerdes.EntryOutput;
 import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
@@ -30,8 +31,6 @@ import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistenc
  * {@link #write(EntryOutput, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
  * around a {@link PersistentRepr}, while {@link #read(EntryInput)} produces an {@link FromPersistence}, which
  * needs further processing to reconstruct a {@link PersistentRepr}.
- *
- * @author Robert Varga
  */
 final class DataJournalEntrySerializer implements EntrySerdes<DataJournalEntry> {
     private final ExtendedActorSystem actorSystem;
@@ -42,19 +41,19 @@ final class DataJournalEntrySerializer implements EntrySerdes<DataJournalEntry>
 
     @Override
     public void write(final EntryOutput output, final DataJournalEntry entry) throws IOException {
-        verify(entry instanceof ToPersistence);
-        final PersistentRepr repr = ((ToPersistence) entry).repr();
-        output.writeString(repr.manifest());
-        output.writeString(repr.writerUuid());
-        output.writeObject(repr.payload());
+        if (entry instanceof ToPersistence toPersistence) {
+            final var repr = toPersistence.repr();
+            output.writeString(repr.manifest());
+            output.writeString(repr.writerUuid());
+            output.writeObject(repr.payload());
+        } else {
+            throw new VerifyException("Unexpected entry " + entry);
+        }
     }
 
     @Override
     public DataJournalEntry read(final EntryInput input) throws IOException {
-        final String manifest = input.readString();
-        final String uuid = input.readString();
-        final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
-            (Callable<Object>) input::readObject);
-        return new FromPersistence(manifest, uuid, payload);
+        return new FromPersistence(input.readString(), input.readString(),
+            JavaSerializer.currentSystem().withValue(actorSystem, (Callable<Object>) input::readObject));
     }
 }
index 5eb688a6150ac1248a119090e7b9e8c4034619ff..20761c32416558d43f28eb32cf05d28edea92301 100644 (file)
@@ -7,13 +7,10 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
-import static com.google.common.base.Verify.verify;
-
 import akka.actor.ActorSystem;
-import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import com.codahale.metrics.Histogram;
-import io.atomix.storage.journal.Indexed;
+import com.google.common.base.VerifyException;
 import io.atomix.storage.journal.JournalSerdes;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.SegmentedJournalReader;
@@ -32,8 +29,6 @@ import scala.jdk.javaapi.CollectionConverters;
 
 /**
  * Version 0 data journal, where every journal entry maps to exactly one segmented file entry.
- *
- * @author Robert Varga
  */
 final class DataJournalV0 extends DataJournal {
     private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class);
@@ -75,25 +70,8 @@ final class DataJournalV0 extends DataJournal {
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
     void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) {
-        try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(fromSequenceNr)) {
-            int count = 0;
-            while (reader.hasNext() && count < message.max) {
-                final Indexed<DataJournalEntry> next = reader.next();
-                if (next.index() > message.toSequenceNr) {
-                    break;
-                }
-
-                LOG.trace("{}: replay {}", persistenceId, next);
-                updateLargestSize(next.size());
-                final DataJournalEntry entry = next.entry();
-                verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
-                final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
-                LOG.debug("{}: replaying {}", persistenceId, repr);
-                message.replayCallback.accept(repr);
-                count++;
-            }
-            LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+        try (var reader = entries.openReader(fromSequenceNr)) {
+            handleReplayMessages(reader, message);
         } catch (Exception e) {
             LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
             message.promise.failure(e);
@@ -102,18 +80,42 @@ final class DataJournalV0 extends DataJournal {
         }
     }
 
+    private void handleReplayMessages(final SegmentedJournalReader<DataJournalEntry> reader,
+            final ReplayMessages message) {
+        int count = 0;
+        while (reader.hasNext() && count < message.max) {
+            final var next = reader.next();
+            if (next.index() > message.toSequenceNr) {
+                break;
+            }
+
+            LOG.trace("{}: replay {}", persistenceId, next);
+            updateLargestSize(next.size());
+            final var entry = next.entry();
+            if (entry instanceof FromPersistence fromPersistence) {
+                final var repr = fromPersistence.toRepr(persistenceId, next.index());
+                LOG.debug("{}: replaying {}", persistenceId, repr);
+                message.replayCallback.accept(repr);
+                count++;
+            } else {
+                throw new VerifyException("Unexpected entry " + entry);
+            }
+        }
+        LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+    }
+
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
     long handleWriteMessages(final WriteMessages message) {
         final int count = message.size();
-        final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+        final var writer = entries.writer();
         long bytes = 0;
 
         for (int i = 0; i < count; ++i) {
             final long mark = writer.getLastIndex();
-            final AtomicWrite request = message.getRequest(i);
+            final var request = message.getRequest(i);
 
-            final List<PersistentRepr> reprs = CollectionConverters.asJava(request.payload());
+            final var reprs = CollectionConverters.asJava(request.payload());
             LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
             try {
                 bytes += writePayload(writer, reprs);
@@ -132,7 +134,7 @@ final class DataJournalV0 extends DataJournal {
 
     private long writePayload(final SegmentedJournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
         long bytes = 0;
-        for (PersistentRepr repr : reprs) {
+        for (var repr : reprs) {
             final Object payload = repr.payload();
             if (!(payload instanceof Serializable)) {
                 throw new UnsupportedOperationException("Non-serializable payload encountered "
index 617353ec9fc4757c6afaf38c87729e4be867bcbb..1324067bc8c71a47c222f53318a33c9e30282a86 100644 (file)
@@ -17,7 +17,6 @@ import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigMemorySize;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -25,7 +24,6 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -39,8 +37,6 @@ import scala.concurrent.Future;
  * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
  * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
  * {@link SegmentedJournalActor} for details on how the persistence works.
- *
- * @author Robert Varga
  */
 public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
@@ -80,12 +76,12 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
-        final Map<ActorRef, WriteMessages> map = new HashMap<>();
-        final List<Future<Optional<Exception>>> result = new ArrayList<>();
+        final var map = new HashMap<ActorRef, WriteMessages>();
+        final var result = new ArrayList<Future<Optional<Exception>>>();
 
-        for (AtomicWrite message : messages) {
-            final String persistenceId = message.persistenceId();
-            final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+        for (var message : messages) {
+            final var persistenceId = message.persistenceId();
+            final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
             result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
         }
 
@@ -116,18 +112,18 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     }
 
     private ActorRef createHandler(final String persistenceId) {
-        final String directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
-        final File directory = new File(rootDir, directoryName);
+        final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
+        final var directory = new File(rootDir, directoryName);
         LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
 
-        final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+        final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
             maxEntrySize, maxSegmentSize));
         LOG.debug("Directory {} handled by {}", directory, handler);
         return handler;
     }
 
     private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
-        final ActorRef handler = handlers.get(persistenceId);
+        final var handler = handlers.get(persistenceId);
         if (handler == null) {
             return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
         }
@@ -145,7 +141,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
         if (!config.hasPath(path)) {
             return defaultValue;
         }
-        final ConfigMemorySize value = config.getMemorySize(path);
+        final var value = config.getMemorySize(path);
         final long result = value.toBytes();
         checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
         return (int) result;
index 264d6a8c060fbc138cc2de52764a2c7f971b639c..56afe2bc40dacdaa5bcfbaf0cafef383a76f08d6 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.base.Stopwatch;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.JournalSerdes;
 import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalWriter;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
 import java.util.ArrayList;
@@ -57,8 +56,6 @@ import scala.concurrent.Promise;
  * <p>
  * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
  * mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
  */
 final class SegmentedJournalActor extends AbstractActor {
     abstract static class AsyncMessage<T> {
@@ -104,7 +101,7 @@ final class SegmentedJournalActor extends AbstractActor {
         private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
 
         Future<Optional<Exception>> add(final AtomicWrite write) {
-            final Promise<Optional<Exception>> promise = Promise.apply();
+            final var promise = Promise.<Optional<Exception>>apply();
             requests.add(write);
             results.add(promise);
             return promise.future();
@@ -200,8 +197,8 @@ final class SegmentedJournalActor extends AbstractActor {
         LOG.debug("{}: actor starting", persistenceId);
         super.preStart();
 
-        final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
-        final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+        final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+        final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
 
         batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
         messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
@@ -249,8 +246,8 @@ final class SegmentedJournalActor extends AbstractActor {
             LOG.debug("{}: deleting entries up to {}", persistenceId, to);
 
             lastDelete = to;
-            final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
-            final Indexed<Long> entry = deleteWriter.append(lastDelete);
+            final var deleteWriter = deleteJournal.writer();
+            final var entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
             dataJournal.deleteTo(lastDelete);
 
@@ -292,7 +289,7 @@ final class SegmentedJournalActor extends AbstractActor {
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final Stopwatch sw = Stopwatch.createStarted();
+        final var sw = Stopwatch.createStarted();
         final long start = dataJournal.lastWrittenSequenceNr();
         final long bytes = dataJournal.handleWriteMessages(message);
         sw.stop();
@@ -314,10 +311,10 @@ final class SegmentedJournalActor extends AbstractActor {
             return;
         }
 
-        final Stopwatch sw = Stopwatch.createStarted();
+        final var sw = Stopwatch.createStarted();
         deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
                 .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
-        final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
+        final var lastEntry = deleteJournal.writer().getLastEntry();
         lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,