<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-test-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-akka-raft</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
protected static final String TAG_ASK_DURATION = "ask-duration";
+
private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+ private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id";
//locally cached values
private Timeout cachedAskDuration;
public String getRpcRegistryPath() {
return get().getString(TAG_RPC_REGISTRY_PATH);
+ }
+ public String getRpcRegistryPersistenceId() {
+ return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID);
}
public String getRpcManagerPath() {
configHolder.put(TAG_ASK_DURATION, "15s");
configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
+ // persistence
+ configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry");
}
public Builder gossipTickInterval(final String interval) {
private final ActorRef rpcRegistrar;
public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
- super(config, new RoutingTable(rpcInvoker));
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
}
}
@Override
- protected void handleReceive(final Object message) throws Exception {
+ protected void handleCommand(final Object message) throws Exception {
if (message instanceof AddOrUpdateRoutes) {
receiveAddRoutes((AddOrUpdateRoutes) message);
} else if (message instanceof RemoveRoutes) {
receiveRemoveRoutes((RemoveRoutes) message);
} else {
- super.handleReceive(message);
+ super.handleCommand(message);
}
}
private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
- updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers()));
+ updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
}
/**
*/
private void receiveRemoveRoutes(final RemoveRoutes msg) {
LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
- updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers()));
+ updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
}
@Override
import akka.actor.ActorRef;
import java.util.Optional;
+import javax.annotation.Nonnull;
public interface Bucket<T extends BucketData<T>> {
long getVersion();
- T getData();
+ @Nonnull T getData();
default Optional<ActorRef> getWatchActor() {
return getData().getWatchActor();
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import java.io.Serializable;
-public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
+final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
private static final long serialVersionUID = 294779770032719196L;
- private Long version = System.currentTimeMillis();
+ // Guaranteed to be non-null.
+ // This is kept a Long for binary compatibility of serialization format.
+ private final Long version;
- private T data;
+ // Guaranteed to be non-null
+ private final T data;
- public BucketImpl(final T data) {
- this.data = data;
- }
-
- public BucketImpl(final Bucket<T> other) {
- this.version = other.getVersion();
- this.data = other.getData();
- }
-
- public void setData(final T data) {
- this.data = data;
- this.version = System.currentTimeMillis() + 1;
+ BucketImpl(final Long version, final T data) {
+ this.version = Preconditions.checkNotNull(version);
+ this.data = Preconditions.checkNotNull(data);
}
@Override
public String toString() {
return "BucketImpl{" + "version=" + version + ", data=" + data + '}';
}
+
+ private Object readResolve() {
+ Verify.verifyNotNull(version);
+ Verify.verifyNotNull(data);
+ return this;
+ }
}
package org.opendaylight.controller.remote.rpc.registry.gossip;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
+import akka.actor.PoisonPill;
import akka.actor.Terminated;
import akka.cluster.ClusterActorRefProvider;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.utils.ConditionalProbe;
/**
* A store that syncs its data across nodes in the cluster.
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
- /**
- * Bucket owned by the node.
- */
- private final BucketImpl<T> localBucket;
-
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
/**
* Buckets owned by other known nodes in the cluster.
*/
*/
private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
+ private final RemoteRpcProviderConfig config;
+ private final String persistenceId;
+
/**
* Cluster address for this node.
*/
private Address selfAddress;
- // FIXME: should be part of test-specific subclass
- private ConditionalProbe probe;
-
- private final RemoteRpcProviderConfig config;
+ /**
+ * Bucket owned by the node. Initialized during recovery (due to incarnation number).
+ */
+ private LocalBucket<T> localBucket;
+ private T initialData;
+ private Integer incarnation;
+ private boolean persisting;
- public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
+ public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
- this.localBucket = new BucketImpl<>(initialData);
+ this.initialData = Preconditions.checkNotNull(initialData);
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ }
+
+ @Override
+ public String persistenceId() {
+ return persistenceId;
}
@Override
@SuppressWarnings("unchecked")
@Override
- protected void handleReceive(final Object message) throws Exception {
- if (probe != null) {
- probe.tell(message, getSelf());
+ protected void handleCommand(final Object message) throws Exception {
+ if (message instanceof GetAllBuckets) {
+ // GetAllBuckets is used only in testing
+ receiveGetAllBuckets();
+ return;
+ }
+
+ if (persisting) {
+ handleSnapshotMessage(message);
+ return;
}
if (message instanceof GetBucketsByMembers) {
removeBucket(((RemoveRemoteBucket) message).getAddress());
} else if (message instanceof Terminated) {
actorTerminated((Terminated) message);
- } else if (message instanceof GetAllBuckets) {
- // GetAllBuckets is used only for unit tests.
- receiveGetAllBuckets();
- } else if (message instanceof ConditionalProbe) {
- // The ConditionalProbe is only used for unit tests.
- LOG.info("Received probe {} {}", getSelf(), message);
- probe = (ConditionalProbe) message;
- // Send back any message to tell the caller we got the probe.
- getSender().tell("Got it", getSelf());
+ } else if (message instanceof DeleteSnapshotsSuccess) {
+ LOG.debug("{}: got command: {}", persistenceId(), message);
+ } else if (message instanceof DeleteSnapshotsFailure) {
+ LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
+ ((DeleteSnapshotsFailure) message).cause());
} else {
LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
}
+ private void handleSnapshotMessage(final Object message) {
+ if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
+ persisting = false;
+ self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ } else if (message instanceof SaveSnapshotSuccess) {
+ LOG.debug("{}: got command: {}", persistenceId(), message);
+ SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
+ deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
+ saved.metadata().timestamp() - 1, 0L, 0L));
+ persisting = false;
+ unstash();
+ } else {
+ LOG.debug("{}: stashing command {}", persistenceId(), message);
+ stash();
+ }
+ }
+
+ @Override
+ protected void handleRecover(final Object message) throws Exception {
+ if (message instanceof RecoveryCompleted) {
+ if (incarnation != null) {
+ incarnation = incarnation + 1;
+ } else {
+ incarnation = 0;
+ }
+
+ this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
+ initialData = null;
+ LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
+ persisting = true;
+ saveSnapshot(incarnation);
+ } else if (message instanceof SnapshotOffer) {
+ incarnation = (Integer) ((SnapshotOffer)message).snapshot();
+ LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
+ } else {
+ LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
+ }
+ }
+
protected RemoteRpcProviderConfig getConfig() {
return config;
}
/**
* Returns all the buckets the this node knows about, self owned + remote.
*/
- void receiveGetAllBuckets() {
+ @VisibleForTesting
+ protected void receiveGetAllBuckets() {
final ActorRef sender = getSender();
sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
}
Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
- all.put(selfAddress, new BucketImpl<>(localBucket));
+ all.put(selfAddress, getLocalBucket().snapshot());
//then get all remote buckets
all.putAll(remoteBuckets);
//first add the local bucket if asked
if (members.contains(selfAddress)) {
- buckets.put(selfAddress, new BucketImpl<>(localBucket));
+ buckets.put(selfAddress, getLocalBucket().snapshot());
}
//then get buckets for requested remote nodes
// Default noop
}
- public BucketImpl<T> getLocalBucket() {
+ @VisibleForTesting
+ protected boolean isPersisting() {
+ return persisting;
+ }
+
+ public T getLocalData() {
+ return getLocalBucket().getData();
+ }
+
+ private LocalBucket<T> getLocalBucket() {
+ Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
return localBucket;
}
protected void updateLocalBucket(final T data) {
- localBucket.setData(data);
- versions.put(selfAddress, localBucket.getVersion());
+ final LocalBucket<T> local = getLocalBucket();
+ final boolean bumpIncarnation = local.setData(data);
+ versions.put(selfAddress, local.getVersion());
+
+ if (bumpIncarnation) {
+ LOG.debug("Version wrapped. incrementing incarnation");
+
+ Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+ incarnation = incarnation + 1;
+
+ persisting = true;
+ saveSnapshot(incarnation);
+ }
}
public Map<Address, Bucket<T>> getRemoteBuckets() {
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Local bucket implementation. Unlike a full-blown {@link Bucket}, this class is mutable and tracks when it has been
+ * changed and when it has been sent anywhere.
+ *
+ * @author Robert Varga
+ */
+final class LocalBucket<T extends BucketData<T>> {
+ /*
+ * Decomposed 64bit signed version number. Always non-negative, hence the most significant bit is always zero.
+ * - incarnation number (most-significant 31 bits, forming an unsigned int)
+ * - version number (least-significant 32 bits, treated as unsigned int)
+ *
+ * We are keeping a boxed version here, as we stick it into a map anyway.
+ */
+ private Long version;
+ private T data;
+
+ // We bump versions only if we took a snapshot since last data update
+ private boolean bumpVersion;
+
+ LocalBucket(final int incarnation, final T data) {
+ Preconditions.checkArgument(incarnation >= 0);
+ this.version = ((long)incarnation) << Integer.SIZE;
+ this.data = Preconditions.checkNotNull(data);
+ }
+
+ T getData() {
+ return data;
+ }
+
+ Long getVersion() {
+ return version;
+ }
+
+ Bucket<T> snapshot() {
+ bumpVersion = true;
+ return new BucketImpl<>(version, data);
+ }
+
+ boolean setData(final T data) {
+ this.data = Preconditions.checkNotNull(data);
+ if (bumpVersion) {
+ final long next = version.longValue() + 1;
+ if ((next & 0xffff_ffffL) == 0) {
+ return true;
+ }
+
+ version = next;
+ bumpVersion = false;
+ }
+ return false;
+ }
+}
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
Preconditions.checkArgument(buckets != null, "buckets can not be null");
- this.buckets = ImmutableMap.copyOf(buckets);
+ this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets));
}
public final Map<Address, Bucket<T>> getBuckets() {
@Override
public Set<String> getGlobalRpc() {
- RoutingTable table = rpcRegistry.getLocalBucket().getData();
+ RoutingTable table = rpcRegistry.getLocalData();
Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
if (route.getRoute() == null) {
@Override
public Set<String> getLocalRegisteredRoutedRpc() {
- RoutingTable table = rpcRegistry.getLocalBucket().getData();
+ RoutingTable table = rpcRegistry.getLocalData();
Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
if (route.getRoute() != null) {
@Override
public Map<String, String> findRpcByName(final String name) {
- RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+ RoutingTable localTable = rpcRegistry.getLocalData();
// Get all RPCs from local bucket
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
}
@Override
- public Map<String, String> findRpcByRoute(String routeId) {
- RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+ public Map<String, String> findRpcByRoute(final String routeId) {
+ RoutingTable localTable = rpcRegistry.getLocalData();
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
/**
* Search if the routing table route String contains routeName.
*/
- private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+ private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
final String address) {
Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
/**
* Search if the routing table route type contains name.
*/
- private Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
+ private static Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
final String address) {
Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
@BeforeClass
public static void setup() throws InterruptedException {
- moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+ moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc")
+ .withConfigReader(ConfigFactory::load).build();
final Config config = moduleConfig.get();
system = ActorSystem.create("odl-cluster-rpc", config);
@BeforeClass
public static void staticSetup() throws InterruptedException {
- AkkaConfigurationReader reader = () -> ConfigFactory.load();
+ AkkaConfigurationReader reader = ConfigFactory::load;
RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
.withConfigReader(reader).build();
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
+import static akka.actor.ActorRef.noSender;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+import akka.actor.Status.Success;
+import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
public class BucketStoreTest {
private static ActorSystem system;
+ private JavaTestKit kit;
+
@BeforeClass
public static void setup() {
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
JavaTestKit.shutdownActorSystem(system);
}
+ @Before
+ public void before() {
+ kit = new JavaTestKit(system);
+ }
+
+ @After
+ public void after() {
+ kit.shutdown(system);
+ }
+
/**
* Given remote buckets, should merge with local copy of remote buckets.
*/
@Test
- public void testReceiveUpdateRemoteBuckets() {
-
- final BucketStore<T> store = createStore();
+ public void testReceiveUpdateRemoteBuckets() throws Exception {
+ final ActorRef store = createStore();
Address localAddress = system.provider().getDefaultAddress();
- Bucket<T> localBucket = new BucketImpl<>(new T());
+ Bucket<T> localBucket = new BucketImpl<>(0L, new T());
Address a1 = new Address("tcp", "system1");
Address a2 = new Address("tcp", "system2");
Address a3 = new Address("tcp", "system3");
- Bucket<T> b1 = new BucketImpl<>(new T());
- Bucket<T> b2 = new BucketImpl<>(new T());
- Bucket<T> b3 = new BucketImpl<>(new T());
+ Bucket<T> b1 = new BucketImpl<>(0L, new T());
+ Bucket<T> b2 = new BucketImpl<>(0L, new T());
+ Bucket<T> b3 = new BucketImpl<>(0L, new T());
Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
remoteBuckets.put(a1, b1);
remoteBuckets.put(a3, b3);
remoteBuckets.put(localAddress, localBucket);
+ Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
+ Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+
//Given remote buckets
- store.receiveUpdateRemoteBuckets(remoteBuckets);
+ store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
- //Should NOT contain local bucket
- //Should contain ONLY 3 entries i.e a1, a2, a3
- Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
- Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
- Assert.assertTrue(remoteBucketsInStore.size() == 3);
+ //Should contain local bucket
+ //Should contain 4 entries i.e a1, a2, a3, local
+ Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
+ Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Add a new remote bucket
Address a4 = new Address("tcp", "system4");
- Bucket<T> b4 = new BucketImpl<>(new T());
+ Bucket<T> b4 = new BucketImpl<>(0L, new T());
remoteBuckets.clear();
remoteBuckets.put(a4, b4);
- store.receiveUpdateRemoteBuckets(remoteBuckets);
+ store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
//Should contain a4
- //Should contain 4 entries now i.e a1, a2, a3, a4
- remoteBucketsInStore = store.getRemoteBuckets();
+ //Should contain 5 entries now i.e a1, a2, a3, a4, local
+ remoteBucketsInStore = getBuckets(store);
Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
- Assert.assertTrue(remoteBucketsInStore.size() == 4);
+ Assert.assertTrue(remoteBucketsInStore.size() == 5);
//Update a bucket
- Bucket<T> b3New = new BucketImpl<>(new T());
+ Bucket<T> b3New = new BucketImpl<>(0L, new T());
remoteBuckets.clear();
remoteBuckets.put(a3, b3New);
remoteBuckets.put(a1, null);
remoteBuckets.put(a2, null);
- store.receiveUpdateRemoteBuckets(remoteBuckets);
+ store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
//Should only update a3
- remoteBucketsInStore = store.getRemoteBuckets();
+ remoteBucketsInStore = getBuckets(store);
Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
- Assert.assertTrue(remoteBucketsInStore.size() == 4);
+ Assert.assertTrue(remoteBucketsInStore.size() == 5);
//Should update versions map
//versions map contains versions for all remote buckets (4).
- Map<Address, Long> versionsInStore = store.getVersions();
+ Map<Address, Long> versionsInStore = getVersions(store);
Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
//Send older version of bucket
remoteBuckets.clear();
remoteBuckets.put(a3, b3);
- store.receiveUpdateRemoteBuckets(remoteBuckets);
+ store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
//Should NOT update a3
- remoteBucketsInStore = store.getRemoteBuckets();
+ remoteBucketsInStore = getBuckets(store);
b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
-
}
/**
*
* @return instance of BucketStore class
*/
- private static BucketStore<T> createStore() {
- final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()),
- new T());
- final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
- return testRef.underlyingActor();
+ private ActorRef createStore() {
+ return kit.childActorOf(Props.create(TestingBucketStore.class,
+ new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
+ final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
+ Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
+ return result.getBuckets();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
+ return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
+ Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
+ }
+
+ private static final class TestingBucketStore extends BucketStore<T> {
+
+ private final List<ActorRef> toNotify = new ArrayList<>();
+
+ TestingBucketStore(final RemoteRpcProviderConfig config,
+ final String persistenceId,
+ final T initialData) {
+ super(config, persistenceId, initialData);
+ }
+
+ @Override
+ protected void handleCommand(Object message) throws Exception {
+ if (message instanceof WaitUntilDonePersisting) {
+ handlePersistAsk();
+ } else if (message instanceof SaveSnapshotSuccess) {
+ super.handleCommand(message);
+ handleSnapshotSuccess();
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
+ private void handlePersistAsk() {
+ if (isPersisting()) {
+ toNotify.add(getSender());
+ } else {
+ getSender().tell(new Success(null), noSender());
+ }
+ }
+
+ private void handleSnapshotSuccess() {
+ toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
+ }
+ }
+
+ /**
+ * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
+ */
+ private static final class WaitUntilDonePersisting {
+
}
}
akka {
loglevel = "DEBUG"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
}
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10ms
}
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
}
memberA {
akka {
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
auto-down-unreachable-after = 10s
}
}
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
}
memberB {
bounded-mailbox {
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
auto-down-unreachable-after = 10s
}
}
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
}
memberC {
bounded-mailbox {
akka {
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
auto-down-unreachable-after = 10s
}
}
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ # Class name of the plugin.
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ # Dispatcher for the plugin actor.
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
}