From 7e629cabf95cf30562f10a658c8ebdd724d1b18d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 29 Dec 2023 12:58:53 +0100 Subject: [PATCH] Modernize sal-akka-segmented-journal Use local variable type inference, instanceof patterns and seal DataJournalEntry. JIRA: CONTROLLER-2089 Change-Id: I3c2ee2de5eaae6874e2f75a44dfd6fce0942e8ae Signed-off-by: Robert Varga --- .../akka/segjournal/DataJournalEntry.java | 4 +- .../DataJournalEntrySerializer.java | 25 ++++---- .../akka/segjournal/DataJournalV0.java | 60 ++++++++++--------- .../akka/segjournal/SegmentedFileJournal.java | 24 ++++---- .../segjournal/SegmentedJournalActor.java | 19 +++--- 5 files changed, 62 insertions(+), 70 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java index 6899c6e1d6..e0321b35d6 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java @@ -16,10 +16,8 @@ import io.atomix.storage.journal.JournalSegment; * A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a * journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s * index. - * - * @author Robert Varga */ -abstract class DataJournalEntry { +abstract sealed class DataJournalEntry { /** * A single data journal entry on its way to the backing file. */ diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java index f2cfbb4f19..9286749ca5 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java @@ -7,12 +7,13 @@ */ package org.opendaylight.controller.akka.segjournal; -import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import akka.actor.ActorSystem; import akka.actor.ExtendedActorSystem; import akka.persistence.PersistentRepr; +import akka.serialization.JavaSerializer; +import com.google.common.base.VerifyException; import io.atomix.storage.journal.JournalSerdes.EntryInput; import io.atomix.storage.journal.JournalSerdes.EntryOutput; import io.atomix.storage.journal.JournalSerdes.EntrySerdes; @@ -30,8 +31,6 @@ import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistenc * {@link #write(EntryOutput, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper * around a {@link PersistentRepr}, while {@link #read(EntryInput)} produces an {@link FromPersistence}, which * needs further processing to reconstruct a {@link PersistentRepr}. - * - * @author Robert Varga */ final class DataJournalEntrySerializer implements EntrySerdes { private final ExtendedActorSystem actorSystem; @@ -42,19 +41,19 @@ final class DataJournalEntrySerializer implements EntrySerdes @Override public void write(final EntryOutput output, final DataJournalEntry entry) throws IOException { - verify(entry instanceof ToPersistence); - final PersistentRepr repr = ((ToPersistence) entry).repr(); - output.writeString(repr.manifest()); - output.writeString(repr.writerUuid()); - output.writeObject(repr.payload()); + if (entry instanceof ToPersistence toPersistence) { + final var repr = toPersistence.repr(); + output.writeString(repr.manifest()); + output.writeString(repr.writerUuid()); + output.writeObject(repr.payload()); + } else { + throw new VerifyException("Unexpected entry " + entry); + } } @Override public DataJournalEntry read(final EntryInput input) throws IOException { - final String manifest = input.readString(); - final String uuid = input.readString(); - final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem, - (Callable) input::readObject); - return new FromPersistence(manifest, uuid, payload); + return new FromPersistence(input.readString(), input.readString(), + JavaSerializer.currentSystem().withValue(actorSystem, (Callable) input::readObject)); } } diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index 5eb688a615..20761c3241 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -7,13 +7,10 @@ */ package org.opendaylight.controller.akka.segjournal; -import static com.google.common.base.Verify.verify; - import akka.actor.ActorSystem; -import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; -import io.atomix.storage.journal.Indexed; +import com.google.common.base.VerifyException; import io.atomix.storage.journal.JournalSerdes; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.SegmentedJournalReader; @@ -32,8 +29,6 @@ import scala.jdk.javaapi.CollectionConverters; /** * Version 0 data journal, where every journal entry maps to exactly one segmented file entry. - * - * @author Robert Varga */ final class DataJournalV0 extends DataJournal { private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class); @@ -75,25 +70,8 @@ final class DataJournalV0 extends DataJournal { @Override @SuppressWarnings("checkstyle:illegalCatch") void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) { - try (SegmentedJournalReader reader = entries.openReader(fromSequenceNr)) { - int count = 0; - while (reader.hasNext() && count < message.max) { - final Indexed next = reader.next(); - if (next.index() > message.toSequenceNr) { - break; - } - - LOG.trace("{}: replay {}", persistenceId, next); - updateLargestSize(next.size()); - final DataJournalEntry entry = next.entry(); - verify(entry instanceof FromPersistence, "Unexpected entry %s", entry); - - final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index()); - LOG.debug("{}: replaying {}", persistenceId, repr); - message.replayCallback.accept(repr); - count++; - } - LOG.debug("{}: successfully replayed {} entries", persistenceId, count); + try (var reader = entries.openReader(fromSequenceNr)) { + handleReplayMessages(reader, message); } catch (Exception e) { LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e); message.promise.failure(e); @@ -102,18 +80,42 @@ final class DataJournalV0 extends DataJournal { } } + private void handleReplayMessages(final SegmentedJournalReader reader, + final ReplayMessages message) { + int count = 0; + while (reader.hasNext() && count < message.max) { + final var next = reader.next(); + if (next.index() > message.toSequenceNr) { + break; + } + + LOG.trace("{}: replay {}", persistenceId, next); + updateLargestSize(next.size()); + final var entry = next.entry(); + if (entry instanceof FromPersistence fromPersistence) { + final var repr = fromPersistence.toRepr(persistenceId, next.index()); + LOG.debug("{}: replaying {}", persistenceId, repr); + message.replayCallback.accept(repr); + count++; + } else { + throw new VerifyException("Unexpected entry " + entry); + } + } + LOG.debug("{}: successfully replayed {} entries", persistenceId, count); + } + @Override @SuppressWarnings("checkstyle:illegalCatch") long handleWriteMessages(final WriteMessages message) { final int count = message.size(); - final SegmentedJournalWriter writer = entries.writer(); + final var writer = entries.writer(); long bytes = 0; for (int i = 0; i < count; ++i) { final long mark = writer.getLastIndex(); - final AtomicWrite request = message.getRequest(i); + final var request = message.getRequest(i); - final List reprs = CollectionConverters.asJava(request.payload()); + final var reprs = CollectionConverters.asJava(request.payload()); LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark); try { bytes += writePayload(writer, reprs); @@ -132,7 +134,7 @@ final class DataJournalV0 extends DataJournal { private long writePayload(final SegmentedJournalWriter writer, final List reprs) { long bytes = 0; - for (PersistentRepr repr : reprs) { + for (var repr : reprs) { final Object payload = repr.payload(); if (!(payload instanceof Serializable)) { throw new UnsupportedOperationException("Non-serializable payload encountered " diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java index 617353ec9f..1324067bc8 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java @@ -17,7 +17,6 @@ import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import akka.persistence.journal.japi.AsyncWriteJournal; import com.typesafe.config.Config; -import com.typesafe.config.ConfigMemorySize; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.StorageLevel; import java.io.File; @@ -25,7 +24,6 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Consumer; @@ -39,8 +37,6 @@ import scala.concurrent.Future; * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See * {@link SegmentedJournalActor} for details on how the persistence works. - * - * @author Robert Varga */ public class SegmentedFileJournal extends AsyncWriteJournal { public static final String STORAGE_ROOT_DIRECTORY = "root-directory"; @@ -80,12 +76,12 @@ public class SegmentedFileJournal extends AsyncWriteJournal { @Override public Future>> doAsyncWriteMessages(final Iterable messages) { - final Map map = new HashMap<>(); - final List>> result = new ArrayList<>(); + final var map = new HashMap(); + final var result = new ArrayList>>(); - for (AtomicWrite message : messages) { - final String persistenceId = message.persistenceId(); - final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler); + for (var message : messages) { + final var persistenceId = message.persistenceId(); + final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler); result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message)); } @@ -116,18 +112,18 @@ public class SegmentedFileJournal extends AsyncWriteJournal { } private ActorRef createHandler(final String persistenceId) { - final String directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8); - final File directory = new File(rootDir, directoryName); + final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8); + final var directory = new File(rootDir, directoryName); LOG.debug("Creating handler for {} in directory {}", persistenceId, directory); - final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage, + final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage, maxEntrySize, maxSegmentSize)); LOG.debug("Directory {} handled by {}", directory, handler); return handler; } private Future delegateMessage(final String persistenceId, final AsyncMessage message) { - final ActorRef handler = handlers.get(persistenceId); + final var handler = handlers.get(persistenceId); if (handler == null) { return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId)); } @@ -145,7 +141,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal { if (!config.hasPath(path)) { return defaultValue; } - final ConfigMemorySize value = config.getMemorySize(path); + final var value = config.getMemorySize(path); final long result = value.toBytes(); checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE); return (int) result; diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 264d6a8c06..56afe2bc40 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -23,7 +23,6 @@ import com.google.common.base.Stopwatch; import io.atomix.storage.journal.Indexed; import io.atomix.storage.journal.JournalSerdes; import io.atomix.storage.journal.SegmentedJournal; -import io.atomix.storage.journal.SegmentedJournalWriter; import io.atomix.storage.journal.StorageLevel; import java.io.File; import java.util.ArrayList; @@ -57,8 +56,6 @@ import scala.concurrent.Promise; *

* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit * mapping information. The only additional information we need to maintain is the last deleted sequence number. - * - * @author Robert Varga */ final class SegmentedJournalActor extends AbstractActor { abstract static class AsyncMessage { @@ -104,7 +101,7 @@ final class SegmentedJournalActor extends AbstractActor { private final List>> results = new ArrayList<>(); Future> add(final AtomicWrite write) { - final Promise> promise = Promise.apply(); + final var promise = Promise.>apply(); requests.add(write); results.add(promise); return promise.future(); @@ -200,8 +197,8 @@ final class SegmentedJournalActor extends AbstractActor { LOG.debug("{}: actor starting", persistenceId); super.preStart(); - final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); - final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); + final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); + final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime")); messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount")); @@ -249,8 +246,8 @@ final class SegmentedJournalActor extends AbstractActor { LOG.debug("{}: deleting entries up to {}", persistenceId, to); lastDelete = to; - final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); - final Indexed entry = deleteWriter.append(lastDelete); + final var deleteWriter = deleteJournal.writer(); + final var entry = deleteWriter.append(lastDelete); deleteWriter.commit(entry.index()); dataJournal.deleteTo(lastDelete); @@ -292,7 +289,7 @@ final class SegmentedJournalActor extends AbstractActor { private void handleWriteMessages(final WriteMessages message) { ensureOpen(); - final Stopwatch sw = Stopwatch.createStarted(); + final var sw = Stopwatch.createStarted(); final long start = dataJournal.lastWrittenSequenceNr(); final long bytes = dataJournal.handleWriteMessages(message); sw.stop(); @@ -314,10 +311,10 @@ final class SegmentedJournalActor extends AbstractActor { return; } - final Stopwatch sw = Stopwatch.createStarted(); + final var sw = Stopwatch.createStarted(); deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); - final Indexed lastEntry = deleteJournal.writer().getLastEntry(); + final var lastEntry = deleteJournal.writer().getLastEntry(); lastDelete = lastEntry == null ? 0 : lastEntry.entry(); dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, -- 2.36.6