X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreActor.java;h=b494256d500f2fe53f2660c3216ce503a4b09d91;hp=84a70425136d31dd6c3f6c242f6afc4e11d6286a;hb=HEAD;hpb=10427641dc0a75ec62e78ecfc4a7a0a7d438d462 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java index 84a7042513..f155880c01 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java @@ -5,9 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc.registry.gossip; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; @@ -25,8 +27,6 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; @@ -37,7 +37,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.function.Consumer; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; /** * A store that syncs its data across nodes in the cluster. @@ -72,7 +72,7 @@ public abstract class BucketStoreActor> extends */ private final SetMultimap watchedActors = HashMultimap.create(1, 1); - private final RemoteRpcProviderConfig config; + private final RemoteOpsProviderConfig config; private final String persistenceId; /** @@ -88,10 +88,10 @@ public abstract class BucketStoreActor> extends private Integer incarnation; private boolean persisting; - protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { - this.config = Preconditions.checkNotNull(config); - this.initialData = Preconditions.checkNotNull(initialData); - this.persistenceId = Preconditions.checkNotNull(persistenceId); + protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) { + this.config = requireNonNull(config); + this.initialData = requireNonNull(initialData); + this.persistenceId = requireNonNull(persistenceId); } static ExecuteInActor getBucketsByMembersMessage(final Collection
members) { @@ -154,18 +154,17 @@ public abstract class BucketStoreActor> extends return; } - if (message instanceof ExecuteInActor) { - ((ExecuteInActor) message).accept(this); + if (message instanceof ExecuteInActor execute) { + execute.accept(this); } else if (GET_BUCKET_VERSIONS == message) { // FIXME: do we need to send ourselves? getSender().tell(ImmutableMap.copyOf(versions), getSelf()); - } else if (message instanceof Terminated) { - actorTerminated((Terminated) message); - } else if (message instanceof DeleteSnapshotsSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - } else if (message instanceof DeleteSnapshotsFailure) { - LOG.warn("{}: failed to delete prior snapshots", persistenceId(), - ((DeleteSnapshotsFailure) message).cause()); + } else if (message instanceof Terminated terminated) { + actorTerminated(terminated); + } else if (message instanceof DeleteSnapshotsSuccess deleteSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), deleteSuccess); + } else if (message instanceof DeleteSnapshotsFailure deleteFailure) { + LOG.warn("{}: failed to delete prior snapshots", persistenceId(), deleteFailure.cause()); } else { LOG.debug("Unhandled message [{}]", message); unhandled(message); @@ -173,15 +172,14 @@ public abstract class BucketStoreActor> extends } private void handleSnapshotMessage(final Object message) { - if (message instanceof SaveSnapshotFailure) { - LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); + if (message instanceof SaveSnapshotFailure saveFailure) { + LOG.error("{}: failed to persist state", persistenceId(), saveFailure.cause()); persisting = false; self().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; - deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), - saved.metadata().timestamp() - 1, 0L, 0L)); + } else if (message instanceof SaveSnapshotSuccess saveSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), saveSuccess); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), saveSuccess.metadata().timestamp() - 1, + 0L, 0L)); persisting = false; unstash(); } else { @@ -191,7 +189,7 @@ public abstract class BucketStoreActor> extends } @Override - protected final void handleRecover(final Object message) throws Exception { + protected final void handleRecover(final Object message) { if (message instanceof RecoveryCompleted) { if (incarnation != null) { incarnation = incarnation + 1; @@ -199,20 +197,20 @@ public abstract class BucketStoreActor> extends incarnation = 0; } - this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData); + this.localBucket = new LocalBucket<>(incarnation, initialData); initialData = null; LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation); persisting = true; saveSnapshot(incarnation); - } else if (message instanceof SnapshotOffer) { - incarnation = (Integer) ((SnapshotOffer)message).snapshot(); + } else if (message instanceof SnapshotOffer snapshotOffer) { + incarnation = (Integer) snapshotOffer.snapshot(); LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation); } else { LOG.warn("{}: ignoring recovery message {}", persistenceId(), message); } } - protected final RemoteRpcProviderConfig getConfig() { + protected final RemoteOpsProviderConfig getConfig() { return config; } @@ -224,7 +222,7 @@ public abstract class BucketStoreActor> extends if (bumpIncarnation) { LOG.debug("Version wrapped. incrementing incarnation"); - Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); + verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); incarnation = incarnation + 1; persisting = true; @@ -377,7 +375,7 @@ public abstract class BucketStoreActor> extends versions.remove(addr); final Bucket bucket = remoteBuckets.remove(addr); if (bucket != null) { - LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr); onBucketRemoved(addr, bucket); } } @@ -389,7 +387,7 @@ public abstract class BucketStoreActor> extends } private LocalBucket getLocalBucket() { - Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); + checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); return localBucket; } }