X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreTest.java;h=4898b27ec9ec0e97341caf68131487ce5b0276b6;hp=59dd29b3142f5c8f80ad81dce17fa859c382eb57;hb=5b66dd8f5e3467a07e77b20fe696b29993ce5565;hpb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 59dd29b314..4898b27ec9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,39 +7,23 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; -import static akka.actor.ActorRef.noSender; - import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; -import akka.actor.Status.Success; -import akka.pattern.Patterns; -import akka.persistence.SaveSnapshotSuccess; import akka.testkit.JavaTestKit; -import akka.util.Timeout; +import akka.testkit.TestActorRef; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; public class BucketStoreTest { @@ -58,8 +42,6 @@ public class BucketStoreTest { private static ActorSystem system; - private JavaTestKit kit; - @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); @@ -71,74 +53,55 @@ public class BucketStoreTest { JavaTestKit.shutdownActorSystem(system); } - @Before - public void before() { - kit = new JavaTestKit(system); - } - - @After - public void after() { - kit.shutdown(system); - } - /** * Given remote buckets, should merge with local copy of remote buckets. */ @Test - public void testReceiveUpdateRemoteBuckets() throws Exception { + public void testReceiveUpdateRemoteBuckets() { + + final BucketStoreActor store = createStore(); - final ActorRef store = createStore(); Address localAddress = system.provider().getDefaultAddress(); Bucket localBucket = new BucketImpl<>(0L, new T()); - Address a1 = new Address("tcp", "system1"); - Address a2 = new Address("tcp", "system2"); - Address a3 = new Address("tcp", "system3"); + final Address a1 = new Address("tcp", "system1"); + final Address a2 = new Address("tcp", "system2"); + final Address a3 = new Address("tcp", "system3"); - Bucket b1 = new BucketImpl<>(0L, new T()); - Bucket b2 = new BucketImpl<>(0L, new T()); - Bucket b3 = new BucketImpl<>(0L, new T()); - - Map> remoteBuckets = new HashMap<>(3); - remoteBuckets.put(a1, b1); - remoteBuckets.put(a2, b2); - remoteBuckets.put(a3, b3); - remoteBuckets.put(localAddress, localBucket); - - Await.result(Patterns.ask(store, new WaitUntilDonePersisting(), - Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf()); + final Bucket b1 = new BucketImpl<>(0L, new T()); + final Bucket b2 = new BucketImpl<>(0L, new T()); + final Bucket b3 = new BucketImpl<>(0L, new T()); //Given remote buckets - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(ImmutableMap.of(a1, b1, a2, b2, localAddress, localBucket)); - //Should contain local bucket - //Should contain 4 entries i.e a1, a2, a3, local - Map> remoteBucketsInStore = getBuckets(store); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + //Should NOT contain local bucket + //Should contain ONLY 3 entries i.e a1, a2 + Map> remoteBucketsInStore = store.getRemoteBuckets(); + Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); + Assert.assertTrue(remoteBucketsInStore.size() == 2); //Add a new remote bucket Address a4 = new Address("tcp", "system4"); Bucket b4 = new BucketImpl<>(0L, new T()); - remoteBuckets.clear(); - remoteBuckets.put(a4, b4); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(ImmutableMap.of(a4, b4)); //Should contain a4 - //Should contain 5 entries now i.e a1, a2, a3, a4, local - remoteBucketsInStore = getBuckets(store); + //Should contain 4 entries now i.e a1, a2, a4 + remoteBucketsInStore = store.getRemoteBuckets(); Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4)); - Assert.assertTrue(remoteBucketsInStore.size() == 5); + Assert.assertTrue(remoteBucketsInStore.size() == 3); //Update a bucket Bucket b3New = new BucketImpl<>(0L, new T()); - remoteBuckets.clear(); + Map> remoteBuckets = new HashMap<>(3); remoteBuckets.put(a3, b3New); remoteBuckets.put(a1, null); remoteBuckets.put(a2, null); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(remoteBuckets); //Should only update a3 - remoteBucketsInStore = getBuckets(store); + remoteBucketsInStore = store.getRemoteBuckets(); Bucket b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion()); @@ -147,11 +110,11 @@ public class BucketStoreTest { Bucket b2InStore = remoteBucketsInStore.get(a2); Assert.assertEquals(b1.getVersion(), b1InStore.getVersion()); Assert.assertEquals(b2.getVersion(), b2InStore.getVersion()); - Assert.assertTrue(remoteBucketsInStore.size() == 5); + Assert.assertTrue(remoteBucketsInStore.size() == 4); //Should update versions map //versions map contains versions for all remote buckets (4). - Map versionsInStore = getVersions(store); + Map versionsInStore = store.getVersions(); Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2)); @@ -161,12 +124,13 @@ public class BucketStoreTest { //Send older version of bucket remoteBuckets.clear(); remoteBuckets.put(a3, b3); - store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); + store.updateRemoteBuckets(remoteBuckets); //Should NOT update a3 - remoteBucketsInStore = getBuckets(store); + remoteBucketsInStore = store.getRemoteBuckets(); b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion()); + } /** @@ -174,63 +138,28 @@ public class BucketStoreTest { * * @return instance of BucketStore class */ - private ActorRef createStore() { - return kit.childActorOf(Props.create(TestingBucketStore.class, - new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T())); - } - - @SuppressWarnings("unchecked") - private static Map> getBuckets(final ActorRef store) throws Exception { - final GetAllBucketsReply result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(), - Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf()); - return result.getBuckets(); + private static BucketStoreActor createStore() { + final Props props = Props.create(TestingBucketStoreActor.class, + new RemoteRpcProviderConfig(system.settings().config()), "testing-store",new T()); + return TestActorRef.>create(system, props, "testStore").underlyingActor(); } - @SuppressWarnings("unchecked") - private static Map getVersions(final ActorRef store) throws Exception { - return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(), - Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions(); - } + private static final class TestingBucketStoreActor extends BucketStoreActor { - private static final class TestingBucketStore extends BucketStore { - - private final List toNotify = new ArrayList<>(); - - TestingBucketStore(final RemoteRpcProviderConfig config, - final String persistenceId, - final T initialData) { + protected TestingBucketStoreActor(final RemoteRpcProviderConfig config, + final String persistenceId, + final T initialData) { super(config, persistenceId, initialData); } @Override - protected void handleCommand(Object message) throws Exception { - if (message instanceof WaitUntilDonePersisting) { - handlePersistAsk(); - } else if (message instanceof SaveSnapshotSuccess) { - super.handleCommand(message); - handleSnapshotSuccess(); - } else { - super.handleCommand(message); - } - } + protected void onBucketRemoved(final Address address, final Bucket bucket) { - private void handlePersistAsk() { - if (isPersisting()) { - toNotify.add(getSender()); - } else { - getSender().tell(new Success(null), noSender()); - } } - private void handleSnapshotSuccess() { - toNotify.forEach(ref -> ref.tell(new Success(null), noSender())); - } - } - - /** - * Message sent to the TestingBucketStore that replies with success once the actor is done persisting. - */ - private static final class WaitUntilDonePersisting { + @Override + protected void onBucketsUpdated(final Map> newBuckets) { + } } }