*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
-import static akka.actor.ActorRef.noSender;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.Status.Success;
-import akka.pattern.Patterns;
-import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
public class BucketStoreTest {
private static ActorSystem system;
- private JavaTestKit kit;
-
@BeforeClass
public static void setup() {
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
JavaTestKit.shutdownActorSystem(system);
}
- @Before
- public void before() {
- kit = new JavaTestKit(system);
- }
-
- @After
- public void after() {
- kit.shutdown(system);
- }
-
/**
* Given remote buckets, should merge with local copy of remote buckets.
*/
@Test
- public void testReceiveUpdateRemoteBuckets() throws Exception {
+ public void testReceiveUpdateRemoteBuckets() {
+
+ final BucketStoreActor<T> store = createStore();
- final ActorRef store = createStore();
Address localAddress = system.provider().getDefaultAddress();
Bucket<T> localBucket = new BucketImpl<>(0L, new T());
- Address a1 = new Address("tcp", "system1");
- Address a2 = new Address("tcp", "system2");
- Address a3 = new Address("tcp", "system3");
+ final Address a1 = new Address("tcp", "system1");
+ final Address a2 = new Address("tcp", "system2");
+ final Address a3 = new Address("tcp", "system3");
- Bucket<T> b1 = new BucketImpl<>(0L, new T());
- Bucket<T> b2 = new BucketImpl<>(0L, new T());
- Bucket<T> b3 = new BucketImpl<>(0L, new T());
-
- Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
- remoteBuckets.put(a1, b1);
- remoteBuckets.put(a2, b2);
- remoteBuckets.put(a3, b3);
- remoteBuckets.put(localAddress, localBucket);
-
- Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
- Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+ final Bucket<T> b1 = new BucketImpl<>(0L, new T());
+ final Bucket<T> b2 = new BucketImpl<>(0L, new T());
+ final Bucket<T> b3 = new BucketImpl<>(0L, new T());
//Given remote buckets
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(ImmutableMap.of(a1, b1, a2, b2, localAddress, localBucket));
- //Should contain local bucket
- //Should contain 4 entries i.e a1, a2, a3, local
- Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
- Assert.assertTrue(remoteBucketsInStore.size() == 4);
+ //Should NOT contain local bucket
+ //Should contain ONLY 3 entries i.e a1, a2
+ Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
+ Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
+ Assert.assertTrue(remoteBucketsInStore.size() == 2);
//Add a new remote bucket
Address a4 = new Address("tcp", "system4");
Bucket<T> b4 = new BucketImpl<>(0L, new T());
- remoteBuckets.clear();
- remoteBuckets.put(a4, b4);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(ImmutableMap.of(a4, b4));
//Should contain a4
- //Should contain 5 entries now i.e a1, a2, a3, a4, local
- remoteBucketsInStore = getBuckets(store);
+ //Should contain 4 entries now i.e a1, a2, a4
+ remoteBucketsInStore = store.getRemoteBuckets();
Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
- Assert.assertTrue(remoteBucketsInStore.size() == 5);
+ Assert.assertTrue(remoteBucketsInStore.size() == 3);
//Update a bucket
Bucket<T> b3New = new BucketImpl<>(0L, new T());
- remoteBuckets.clear();
+ Map<Address, Bucket<?>> remoteBuckets = new HashMap<>(3);
remoteBuckets.put(a3, b3New);
remoteBuckets.put(a1, null);
remoteBuckets.put(a2, null);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(remoteBuckets);
//Should only update a3
- remoteBucketsInStore = getBuckets(store);
+ remoteBucketsInStore = store.getRemoteBuckets();
Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
- Assert.assertTrue(remoteBucketsInStore.size() == 5);
+ Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Should update versions map
//versions map contains versions for all remote buckets (4).
- Map<Address, Long> versionsInStore = getVersions(store);
+ Map<Address, Long> versionsInStore = store.getVersions();
Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
//Send older version of bucket
remoteBuckets.clear();
remoteBuckets.put(a3, b3);
- store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
+ store.updateRemoteBuckets(remoteBuckets);
//Should NOT update a3
- remoteBucketsInStore = getBuckets(store);
+ remoteBucketsInStore = store.getRemoteBuckets();
b3InStore = remoteBucketsInStore.get(a3);
Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
+
}
/**
*
* @return instance of BucketStore class
*/
- private ActorRef createStore() {
- return kit.childActorOf(Props.create(TestingBucketStore.class,
- new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
- }
-
- @SuppressWarnings("unchecked")
- private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
- final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
- Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
- return result.getBuckets();
+ private static BucketStoreActor<T> createStore() {
+ final Props props = Props.create(TestingBucketStoreActor.class,
+ new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T());
+ return TestActorRef.<BucketStoreActor<T>>create(system, props, "testStore").underlyingActor();
}
- @SuppressWarnings("unchecked")
- private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
- return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
- Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
- }
+ private static final class TestingBucketStoreActor extends BucketStoreActor<T> {
- private static final class TestingBucketStore extends BucketStore<T> {
-
- private final List<ActorRef> toNotify = new ArrayList<>();
-
- TestingBucketStore(final RemoteRpcProviderConfig config,
- final String persistenceId,
- final T initialData) {
+ protected TestingBucketStoreActor(final RemoteRpcProviderConfig config,
+ final String persistenceId,
+ final T initialData) {
super(config, persistenceId, initialData);
}
@Override
- protected void handleCommand(Object message) throws Exception {
- if (message instanceof WaitUntilDonePersisting) {
- handlePersistAsk();
- } else if (message instanceof SaveSnapshotSuccess) {
- super.handleCommand(message);
- handleSnapshotSuccess();
- } else {
- super.handleCommand(message);
- }
- }
+ protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
- private void handlePersistAsk() {
- if (isPersisting()) {
- toNotify.add(getSender());
- } else {
- getSender().tell(new Success(null), noSender());
- }
}
- private void handleSnapshotSuccess() {
- toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
- }
- }
-
- /**
- * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
- */
- private static final class WaitUntilDonePersisting {
+ @Override
+ protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+ }
}
}