From: Robert Varga Date: Wed, 18 Jan 2017 15:01:21 +0000 (+0100) Subject: BUG-7556: update version tracking X-Git-Tag: release/carbon~282 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d04b71990a802071a786fe8f0df57bc4adbdec3f BUG-7556: update version tracking This patch adds better version tracking, so it does not rely on calendar time, but rather is monotonically increasing. The 64bit version field is logically split into an incarnation number (31 bits) and a version number (32 bits). Change-Id: Ie0e1f4089cc1ee582037982d9837490348158975 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index ee94c98181..23e6e689b3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -151,6 +151,12 @@ org.opendaylight.yangtools yang-test-util + + org.opendaylight.controller + sal-akka-raft + test-jar + test + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java index c1846b8e69..1d663877c5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java @@ -24,7 +24,9 @@ public class RemoteRpcProviderConfig extends CommonConfig { protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path"; protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path"; protected static final String TAG_ASK_DURATION = "ask-duration"; + private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval"; + private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id"; //locally cached values private Timeout cachedAskDuration; @@ -56,7 +58,10 @@ public class RemoteRpcProviderConfig extends CommonConfig { public String getRpcRegistryPath() { return get().getString(TAG_RPC_REGISTRY_PATH); + } + public String getRpcRegistryPersistenceId() { + return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID); } public String getRpcManagerPath() { @@ -118,6 +123,8 @@ public class RemoteRpcProviderConfig extends CommonConfig { configHolder.put(TAG_ASK_DURATION, "15s"); configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms"); + // persistence + configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry"); } public Builder gossipTickInterval(final String interval) { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index c8415cc818..54f7613209 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -43,7 +43,7 @@ public class RpcRegistry extends BucketStore { private final ActorRef rpcRegistrar; public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { - super(config, new RoutingTable(rpcInvoker)); + super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker)); this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); } @@ -61,19 +61,19 @@ public class RpcRegistry extends BucketStore { } @Override - protected void handleReceive(final Object message) throws Exception { + protected void handleCommand(final Object message) throws Exception { if (message instanceof AddOrUpdateRoutes) { receiveAddRoutes((AddOrUpdateRoutes) message); } else if (message instanceof RemoveRoutes) { receiveRemoveRoutes((RemoveRoutes) message); } else { - super.handleReceive(message); + super.handleCommand(message); } } private void receiveAddRoutes(final AddOrUpdateRoutes msg) { LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers()); - updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers())); + updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers())); } /** @@ -83,7 +83,7 @@ public class RpcRegistry extends BucketStore { */ private void receiveRemoveRoutes(final RemoveRoutes msg) { LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers()); - updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers())); + updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers())); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java index 4e3e5519fb..faa51b90ad 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -9,11 +9,12 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; import java.util.Optional; +import javax.annotation.Nonnull; public interface Bucket> { long getVersion(); - T getData(); + @Nonnull T getData(); default Optional getWatchActor() { return getData().getWatchActor(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index a927c93533..ade614b8d7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -7,27 +7,23 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.io.Serializable; -public final class BucketImpl> implements Bucket, Serializable { +final class BucketImpl> implements Bucket, Serializable { private static final long serialVersionUID = 294779770032719196L; - private Long version = System.currentTimeMillis(); + // Guaranteed to be non-null. + // This is kept a Long for binary compatibility of serialization format. + private final Long version; - private T data; + // Guaranteed to be non-null + private final T data; - public BucketImpl(final T data) { - this.data = data; - } - - public BucketImpl(final Bucket other) { - this.version = other.getVersion(); - this.data = other.getData(); - } - - public void setData(final T data) { - this.data = data; - this.version = System.currentTimeMillis() + 1; + BucketImpl(final Long version, final T data) { + this.version = Preconditions.checkNotNull(version); + this.data = Preconditions.checkNotNull(data); } @Override @@ -44,4 +40,10 @@ public final class BucketImpl> implements Bucket, Ser public String toString() { return "BucketImpl{" + "version=" + version + ", data=" + data + '}'; } + + private Object readResolve() { + Verify.verifyNotNull(version); + Verify.verifyNotNull(data); + return this; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index b4af0adfe5..70f4053723 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -8,12 +8,24 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; + import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; +import akka.actor.PoisonPill; import akka.actor.Terminated; import akka.cluster.ClusterActorRefProvider; +import akka.persistence.DeleteSnapshotsFailure; +import akka.persistence.DeleteSnapshotsSuccess; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; import java.util.HashMap; @@ -21,9 +33,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; @@ -31,7 +42,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import org.opendaylight.controller.utils.ConditionalProbe; /** * A store that syncs its data across nodes in the cluster. @@ -43,12 +53,7 @@ import org.opendaylight.controller.utils.ConditionalProbe; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore> extends AbstractUntypedActorWithMetering { - /** - * Bucket owned by the node. - */ - private final BucketImpl localBucket; - +public class BucketStore> extends AbstractUntypedPersistentActorWithMetering { /** * Buckets owned by other known nodes in the cluster. */ @@ -65,19 +70,31 @@ public class BucketStore> extends AbstractUntypedActorWi */ private final SetMultimap watchedActors = HashMultimap.create(1, 1); + private final RemoteRpcProviderConfig config; + private final String persistenceId; + /** * Cluster address for this node. */ private Address selfAddress; - // FIXME: should be part of test-specific subclass - private ConditionalProbe probe; - - private final RemoteRpcProviderConfig config; + /** + * Bucket owned by the node. Initialized during recovery (due to incarnation number). + */ + private LocalBucket localBucket; + private T initialData; + private Integer incarnation; + private boolean persisting; - public BucketStore(final RemoteRpcProviderConfig config, final T initialData) { + public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { this.config = Preconditions.checkNotNull(config); - this.localBucket = new BucketImpl<>(initialData); + this.initialData = Preconditions.checkNotNull(initialData); + this.persistenceId = Preconditions.checkNotNull(persistenceId); + } + + @Override + public String persistenceId() { + return persistenceId; } @Override @@ -92,9 +109,16 @@ public class BucketStore> extends AbstractUntypedActorWi @SuppressWarnings("unchecked") @Override - protected void handleReceive(final Object message) throws Exception { - if (probe != null) { - probe.tell(message, getSelf()); + protected void handleCommand(final Object message) throws Exception { + if (message instanceof GetAllBuckets) { + // GetAllBuckets is used only in testing + receiveGetAllBuckets(); + return; + } + + if (persisting) { + handleSnapshotMessage(message); + return; } if (message instanceof GetBucketsByMembers) { @@ -107,21 +131,57 @@ public class BucketStore> extends AbstractUntypedActorWi removeBucket(((RemoveRemoteBucket) message).getAddress()); } else if (message instanceof Terminated) { actorTerminated((Terminated) message); - } else if (message instanceof GetAllBuckets) { - // GetAllBuckets is used only for unit tests. - receiveGetAllBuckets(); - } else if (message instanceof ConditionalProbe) { - // The ConditionalProbe is only used for unit tests. - LOG.info("Received probe {} {}", getSelf(), message); - probe = (ConditionalProbe) message; - // Send back any message to tell the caller we got the probe. - getSender().tell("Got it", getSelf()); + } else if (message instanceof DeleteSnapshotsSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), message); + } else if (message instanceof DeleteSnapshotsFailure) { + LOG.warn("{}: failed to delete prior snapshots", persistenceId(), + ((DeleteSnapshotsFailure) message).cause()); } else { LOG.debug("Unhandled message [{}]", message); unhandled(message); } } + private void handleSnapshotMessage(final Object message) { + if (message instanceof SaveSnapshotFailure) { + LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); + persisting = false; + self().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), message); + SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; + deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), + saved.metadata().timestamp() - 1, 0L, 0L)); + persisting = false; + unstash(); + } else { + LOG.debug("{}: stashing command {}", persistenceId(), message); + stash(); + } + } + + @Override + protected void handleRecover(final Object message) throws Exception { + if (message instanceof RecoveryCompleted) { + if (incarnation != null) { + incarnation = incarnation + 1; + } else { + incarnation = 0; + } + + this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData); + initialData = null; + LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation); + persisting = true; + saveSnapshot(incarnation); + } else if (message instanceof SnapshotOffer) { + incarnation = (Integer) ((SnapshotOffer)message).snapshot(); + LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation); + } else { + LOG.warn("{}: ignoring recovery message {}", persistenceId(), message); + } + } + protected RemoteRpcProviderConfig getConfig() { return config; } @@ -129,7 +189,8 @@ public class BucketStore> extends AbstractUntypedActorWi /** * Returns all the buckets the this node knows about, self owned + remote. */ - void receiveGetAllBuckets() { + @VisibleForTesting + protected void receiveGetAllBuckets() { final ActorRef sender = getSender(); sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf()); } @@ -143,7 +204,7 @@ public class BucketStore> extends AbstractUntypedActorWi Map> all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, new BucketImpl<>(localBucket)); + all.put(selfAddress, getLocalBucket().snapshot()); //then get all remote buckets all.putAll(remoteBuckets); @@ -173,7 +234,7 @@ public class BucketStore> extends AbstractUntypedActorWi //first add the local bucket if asked if (members.contains(selfAddress)) { - buckets.put(selfAddress, new BucketImpl<>(localBucket)); + buckets.put(selfAddress, getLocalBucket().snapshot()); } //then get buckets for requested remote nodes @@ -307,13 +368,34 @@ public class BucketStore> extends AbstractUntypedActorWi // Default noop } - public BucketImpl getLocalBucket() { + @VisibleForTesting + protected boolean isPersisting() { + return persisting; + } + + public T getLocalData() { + return getLocalBucket().getData(); + } + + private LocalBucket getLocalBucket() { + Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); return localBucket; } protected void updateLocalBucket(final T data) { - localBucket.setData(data); - versions.put(selfAddress, localBucket.getVersion()); + final LocalBucket local = getLocalBucket(); + final boolean bumpIncarnation = local.setData(data); + versions.put(selfAddress, local.getVersion()); + + if (bumpIncarnation) { + LOG.debug("Version wrapped. incrementing incarnation"); + + Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); + incarnation = incarnation + 1; + + persisting = true; + saveSnapshot(incarnation); + } } public Map> getRemoteBuckets() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java new file mode 100644 index 0000000000..b2c5e0c833 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import com.google.common.base.Preconditions; + +/** + * Local bucket implementation. Unlike a full-blown {@link Bucket}, this class is mutable and tracks when it has been + * changed and when it has been sent anywhere. + * + * @author Robert Varga + */ +final class LocalBucket> { + /* + * Decomposed 64bit signed version number. Always non-negative, hence the most significant bit is always zero. + * - incarnation number (most-significant 31 bits, forming an unsigned int) + * - version number (least-significant 32 bits, treated as unsigned int) + * + * We are keeping a boxed version here, as we stick it into a map anyway. + */ + private Long version; + private T data; + + // We bump versions only if we took a snapshot since last data update + private boolean bumpVersion; + + LocalBucket(final int incarnation, final T data) { + Preconditions.checkArgument(incarnation >= 0); + this.version = ((long)incarnation) << Integer.SIZE; + this.data = Preconditions.checkNotNull(data); + } + + T getData() { + return data; + } + + Long getVersion() { + return version; + } + + Bucket snapshot() { + bumpVersion = true; + return new BucketImpl<>(version, data); + } + + boolean setData(final T data) { + this.data = Preconditions.checkNotNull(data); + if (bumpVersion) { + final long next = version.longValue() + 1; + if ((next & 0xffff_ffffL) == 0) { + return true; + } + + version = next; + bumpVersion = false; + } + return false; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index 6f81fe8b32..361f5b7e1c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -12,6 +12,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions; @@ -50,7 +52,7 @@ public class Messages { protected ContainsBuckets(final Map> buckets) { Preconditions.checkArgument(buckets != null, "buckets can not be null"); - this.buckets = ImmutableMap.copyOf(buckets); + this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets)); } public final Map> getBuckets() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java index 5fbd91cc81..339d4b1a32 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java @@ -45,7 +45,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Set getGlobalRpc() { - RoutingTable table = rpcRegistry.getLocalBucket().getData(); + RoutingTable table = rpcRegistry.getLocalData(); Set globalRpc = new HashSet<>(table.getRoutes().size()); for (RpcRouter.RouteIdentifier route : table.getRoutes()) { if (route.getRoute() == null) { @@ -59,7 +59,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Set getLocalRegisteredRoutedRpc() { - RoutingTable table = rpcRegistry.getLocalBucket().getData(); + RoutingTable table = rpcRegistry.getLocalData(); Set routedRpc = new HashSet<>(table.getRoutes().size()); for (RpcRouter.RouteIdentifier route : table.getRoutes()) { if (route.getRoute() != null) { @@ -76,7 +76,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot @Override public Map findRpcByName(final String name) { - RoutingTable localTable = rpcRegistry.getLocalBucket().getData(); + RoutingTable localTable = rpcRegistry.getLocalData(); // Get all RPCs from local bucket Map rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT)); @@ -92,8 +92,8 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot } @Override - public Map findRpcByRoute(String routeId) { - RoutingTable localTable = rpcRegistry.getLocalBucket().getData(); + public Map findRpcByRoute(final String routeId) { + RoutingTable localTable = rpcRegistry.getLocalData(); Map rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT)); Map> buckets = rpcRegistry.getRemoteBuckets(); @@ -109,7 +109,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot /** * Search if the routing table route String contains routeName. */ - private Map getRpcMemberMapByRoute(final RoutingTable table, final String routeName, + private static Map getRpcMemberMapByRoute(final RoutingTable table, final String routeName, final String address) { Set> routes = table.getRoutes(); Map rpcMap = new HashMap<>(routes.size()); @@ -130,7 +130,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot /** * Search if the routing table route type contains name. */ - private Map getRpcMemberMapByName(final RoutingTable table, final String name, + private static Map getRpcMemberMapByName(final RoutingTable table, final String name, final String address) { Set> routes = table.getRoutes(); Map rpcMap = new HashMap<>(routes.size()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java index 06781b430d..76e63c4e1d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -15,6 +15,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.Assert; @@ -32,7 +33,8 @@ public class RemoteRpcProviderTest { @BeforeClass public static void setup() throws InterruptedException { - moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build(); + moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc") + .withConfigReader(ConfigFactory::load).build(); final Config config = moduleConfig.get(); system = ActorSystem.create("odl-cluster-rpc", config); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index 3ceb5c041c..0c3a57e8c9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -80,7 +80,7 @@ public class RpcRegistryTest { @BeforeClass public static void staticSetup() throws InterruptedException { - AkkaConfigurationReader reader = () -> ConfigFactory.load(); + AkkaConfigurationReader reader = ConfigFactory::load; RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms") .withConfigReader(reader).build(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 4e3961aac1..59dd29b314 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,22 +7,39 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static akka.actor.ActorRef.noSender; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; +import akka.actor.Status.Success; +import akka.pattern.Patterns; +import akka.persistence.SaveSnapshotSuccess; import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.typesafe.config.ConfigFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; public class BucketStoreTest { @@ -41,6 +58,8 @@ public class BucketStoreTest { private static ActorSystem system; + private JavaTestKit kit; + @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); @@ -52,24 +71,33 @@ public class BucketStoreTest { JavaTestKit.shutdownActorSystem(system); } + @Before + public void before() { + kit = new JavaTestKit(system); + } + + @After + public void after() { + kit.shutdown(system); + } + /** * Given remote buckets, should merge with local copy of remote buckets. */ @Test - public void testReceiveUpdateRemoteBuckets() { - - final BucketStore store = createStore(); + public void testReceiveUpdateRemoteBuckets() throws Exception { + final ActorRef store = createStore(); Address localAddress = system.provider().getDefaultAddress(); - Bucket localBucket = new BucketImpl<>(new T()); + Bucket localBucket = new BucketImpl<>(0L, new T()); Address a1 = new Address("tcp", "system1"); Address a2 = new Address("tcp", "system2"); Address a3 = new Address("tcp", "system3"); - Bucket b1 = new BucketImpl<>(new T()); - Bucket b2 = new BucketImpl<>(new T()); - Bucket b3 = new BucketImpl<>(new T()); + Bucket b1 = new BucketImpl<>(0L, new T()); + Bucket b2 = new BucketImpl<>(0L, new T()); + Bucket b3 = new BucketImpl<>(0L, new T()); Map> remoteBuckets = new HashMap<>(3); remoteBuckets.put(a1, b1); @@ -77,38 +105,40 @@ public class BucketStoreTest { remoteBuckets.put(a3, b3); remoteBuckets.put(localAddress, localBucket); + Await.result(Patterns.ask(store, new WaitUntilDonePersisting(), + Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf()); + //Given remote buckets - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); - //Should NOT contain local bucket - //Should contain ONLY 3 entries i.e a1, a2, a3 - Map> remoteBucketsInStore = store.getRemoteBuckets(); - Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); - Assert.assertTrue(remoteBucketsInStore.size() == 3); + //Should contain local bucket + //Should contain 4 entries i.e a1, a2, a3, local + Map> remoteBucketsInStore = getBuckets(store); + Assert.assertTrue(remoteBucketsInStore.size() == 4); //Add a new remote bucket Address a4 = new Address("tcp", "system4"); - Bucket b4 = new BucketImpl<>(new T()); + Bucket b4 = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); remoteBuckets.put(a4, b4); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should contain a4 - //Should contain 4 entries now i.e a1, a2, a3, a4 - remoteBucketsInStore = store.getRemoteBuckets(); + //Should contain 5 entries now i.e a1, a2, a3, a4, local + remoteBucketsInStore = getBuckets(store); Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4)); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Update a bucket - Bucket b3New = new BucketImpl<>(new T()); + Bucket b3New = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); remoteBuckets.put(a3, b3New); remoteBuckets.put(a1, null); remoteBuckets.put(a2, null); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should only update a3 - remoteBucketsInStore = store.getRemoteBuckets(); + remoteBucketsInStore = getBuckets(store); Bucket b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion()); @@ -117,11 +147,11 @@ public class BucketStoreTest { Bucket b2InStore = remoteBucketsInStore.get(a2); Assert.assertEquals(b1.getVersion(), b1InStore.getVersion()); Assert.assertEquals(b2.getVersion(), b2InStore.getVersion()); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Should update versions map //versions map contains versions for all remote buckets (4). - Map versionsInStore = store.getVersions(); + Map versionsInStore = getVersions(store); Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2)); @@ -131,13 +161,12 @@ public class BucketStoreTest { //Send older version of bucket remoteBuckets.clear(); remoteBuckets.put(a3, b3); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should NOT update a3 - remoteBucketsInStore = store.getRemoteBuckets(); + remoteBucketsInStore = getBuckets(store); b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion()); - } /** @@ -145,10 +174,63 @@ public class BucketStoreTest { * * @return instance of BucketStore class */ - private static BucketStore createStore() { - final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()), - new T()); - final TestActorRef> testRef = TestActorRef.create(system, props, "testStore"); - return testRef.underlyingActor(); + private ActorRef createStore() { + return kit.childActorOf(Props.create(TestingBucketStore.class, + new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T())); + } + + @SuppressWarnings("unchecked") + private static Map> getBuckets(final ActorRef store) throws Exception { + final GetAllBucketsReply result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf()); + return result.getBuckets(); + } + + @SuppressWarnings("unchecked") + private static Map getVersions(final ActorRef store) throws Exception { + return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions(); + } + + private static final class TestingBucketStore extends BucketStore { + + private final List toNotify = new ArrayList<>(); + + TestingBucketStore(final RemoteRpcProviderConfig config, + final String persistenceId, + final T initialData) { + super(config, persistenceId, initialData); + } + + @Override + protected void handleCommand(Object message) throws Exception { + if (message instanceof WaitUntilDonePersisting) { + handlePersistAsk(); + } else if (message instanceof SaveSnapshotSuccess) { + super.handleCommand(message); + handleSnapshotSuccess(); + } else { + super.handleCommand(message); + } + } + + private void handlePersistAsk() { + if (isPersisting()) { + toNotify.add(getSender()); + } else { + getSender().tell(new Success(null), noSender()); + } + } + + private void handleSnapshotSuccess() { + toNotify.forEach(ref -> ref.tell(new Success(null), noSender())); + } + } + + /** + * Message sent to the TestingBucketStore that replies with success once the actor is done persisting. + */ + private static final class WaitUntilDonePersisting { + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf index 524e69ffe3..11ad5eceb7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -40,6 +40,8 @@ unit-test { akka { loglevel = "DEBUG" #loggers = ["akka.event.slf4j.Slf4jLogger"] + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" } bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" @@ -47,6 +49,17 @@ unit-test { mailbox-capacity = 1000 mailbox-push-timeout-time = 10ms } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } } memberA { @@ -58,6 +71,9 @@ memberA { akka { loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] + + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" actor { provider = "akka.cluster.ClusterActorRefProvider" debug { @@ -82,6 +98,16 @@ memberA { auto-down-unreachable-after = 10s } } + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } } memberB { bounded-mailbox { @@ -93,6 +119,9 @@ memberB { loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + actor { provider = "akka.cluster.ClusterActorRefProvider" debug { @@ -117,6 +146,16 @@ memberB { auto-down-unreachable-after = 10s } } + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } } memberC { bounded-mailbox { @@ -127,6 +166,10 @@ memberC { akka { loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] + + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + actor { provider = "akka.cluster.ClusterActorRefProvider" debug { @@ -151,5 +194,15 @@ memberC { auto-down-unreachable-after = 10s } } + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } }