X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreActor.java;h=b494256d500f2fe53f2660c3216ce503a4b09d91;hp=f627138c67f17db857b46aaa59760ef4488bf06d;hb=HEAD;hpb=fc7746666e4f822074ebe57639e8398bfe9e7951 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java index f627138c67..f155880c01 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreActor.java @@ -1,13 +1,15 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.remote.rpc.registry.gossip; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS; @@ -25,8 +27,6 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; @@ -37,7 +37,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.function.Consumer; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; /** * A store that syncs its data across nodes in the cluster. @@ -72,7 +72,7 @@ public abstract class BucketStoreActor> extends */ private final SetMultimap watchedActors = HashMultimap.create(1, 1); - private final RemoteRpcProviderConfig config; + private final RemoteOpsProviderConfig config; private final String persistenceId; /** @@ -88,10 +88,10 @@ public abstract class BucketStoreActor> extends private Integer incarnation; private boolean persisting; - protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { - this.config = Preconditions.checkNotNull(config); - this.initialData = Preconditions.checkNotNull(initialData); - this.persistenceId = Preconditions.checkNotNull(persistenceId); + protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) { + this.config = requireNonNull(config); + this.initialData = requireNonNull(initialData); + this.persistenceId = requireNonNull(persistenceId); } static ExecuteInActor getBucketsByMembersMessage(final Collection
members) { @@ -106,6 +106,14 @@ public abstract class BucketStoreActor> extends return actor -> actor.updateRemoteBuckets(buckets); } + static ExecuteInActor getLocalDataMessage() { + return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf()); + } + + static ExecuteInActor getRemoteBucketsMessage() { + return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf()); + } + public final T getLocalData() { return getLocalBucket().getData(); } @@ -146,18 +154,17 @@ public abstract class BucketStoreActor> extends return; } - if (message instanceof ExecuteInActor) { - ((ExecuteInActor) message).accept(this); + if (message instanceof ExecuteInActor execute) { + execute.accept(this); } else if (GET_BUCKET_VERSIONS == message) { // FIXME: do we need to send ourselves? getSender().tell(ImmutableMap.copyOf(versions), getSelf()); - } else if (message instanceof Terminated) { - actorTerminated((Terminated) message); - } else if (message instanceof DeleteSnapshotsSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - } else if (message instanceof DeleteSnapshotsFailure) { - LOG.warn("{}: failed to delete prior snapshots", persistenceId(), - ((DeleteSnapshotsFailure) message).cause()); + } else if (message instanceof Terminated terminated) { + actorTerminated(terminated); + } else if (message instanceof DeleteSnapshotsSuccess deleteSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), deleteSuccess); + } else if (message instanceof DeleteSnapshotsFailure deleteFailure) { + LOG.warn("{}: failed to delete prior snapshots", persistenceId(), deleteFailure.cause()); } else { LOG.debug("Unhandled message [{}]", message); unhandled(message); @@ -165,15 +172,14 @@ public abstract class BucketStoreActor> extends } private void handleSnapshotMessage(final Object message) { - if (message instanceof SaveSnapshotFailure) { - LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); + if (message instanceof SaveSnapshotFailure saveFailure) { + LOG.error("{}: failed to persist state", persistenceId(), saveFailure.cause()); persisting = false; self().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; - deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), - saved.metadata().timestamp() - 1, 0L, 0L)); + } else if (message instanceof SaveSnapshotSuccess saveSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), saveSuccess); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), saveSuccess.metadata().timestamp() - 1, + 0L, 0L)); persisting = false; unstash(); } else { @@ -183,7 +189,7 @@ public abstract class BucketStoreActor> extends } @Override - protected final void handleRecover(final Object message) throws Exception { + protected final void handleRecover(final Object message) { if (message instanceof RecoveryCompleted) { if (incarnation != null) { incarnation = incarnation + 1; @@ -191,20 +197,20 @@ public abstract class BucketStoreActor> extends incarnation = 0; } - this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData); + this.localBucket = new LocalBucket<>(incarnation, initialData); initialData = null; LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation); persisting = true; saveSnapshot(incarnation); - } else if (message instanceof SnapshotOffer) { - incarnation = (Integer) ((SnapshotOffer)message).snapshot(); + } else if (message instanceof SnapshotOffer snapshotOffer) { + incarnation = (Integer) snapshotOffer.snapshot(); LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation); } else { LOG.warn("{}: ignoring recovery message {}", persistenceId(), message); } } - protected final RemoteRpcProviderConfig getConfig() { + protected final RemoteOpsProviderConfig getConfig() { return config; } @@ -216,7 +222,7 @@ public abstract class BucketStoreActor> extends if (bumpIncarnation) { LOG.debug("Version wrapped. incrementing incarnation"); - Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); + verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); incarnation = incarnation + 1; persisting = true; @@ -230,14 +236,14 @@ public abstract class BucketStoreActor> extends * @param address Remote address * @param bucket Bucket removed */ - protected abstract void onBucketRemoved(final Address address, final Bucket bucket); + protected abstract void onBucketRemoved(Address address, Bucket bucket); /** * Callback to subclasses invoked when the set of remote buckets is updated. * * @param newBuckets Map of address to new bucket. Never null, but can be empty. */ - protected abstract void onBucketsUpdated(final Map> newBuckets); + protected abstract void onBucketsUpdated(Map> newBuckets); /** * Helper to collect all known buckets. @@ -369,7 +375,7 @@ public abstract class BucketStoreActor> extends versions.remove(addr); final Bucket bucket = remoteBuckets.remove(addr); if (bucket != null) { - LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr); onBucketRemoved(addr, bucket); } } @@ -381,7 +387,7 @@ public abstract class BucketStoreActor> extends } private LocalBucket getLocalBucket() { - Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); + checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); return localBucket; } }