X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreTest.java;h=59dd29b3142f5c8f80ad81dce17fa859c382eb57;hp=ddd08a5f4723f4c6669082c2fe4fcef3c6ff94af;hb=d04b71990a802071a786fe8f0df57bc4adbdec3f;hpb=294e250fa1c11f36a6ddb9470f985df978039355 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index ddd08a5f47..59dd29b314 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,122 +7,166 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static akka.actor.ActorRef.noSender; + +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; -import akka.testkit.TestActorRef; +import akka.actor.Status.Success; +import akka.pattern.Patterns; +import akka.persistence.SaveSnapshotSuccess; +import akka.testkit.JavaTestKit; +import akka.util.Timeout; import com.typesafe.config.ConfigFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; public class BucketStoreTest { + /** + * Dummy class to eliminate rawtype warnings. + * + * @author gwu + * + */ + private static class T implements BucketData { + @Override + public Optional getWatchActor() { + return Optional.empty(); + } + } + private static ActorSystem system; + private JavaTestKit kit; + @BeforeClass public static void setup() { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); } @AfterClass public static void teardown() { - system.shutdown(); + JavaTestKit.shutdownActorSystem(system); + } + + @Before + public void before() { + kit = new JavaTestKit(system); + } + + @After + public void after() { + kit.shutdown(system); } /** - * Given remote buckets - * Should merge with local copy of remote buckets + * Given remote buckets, should merge with local copy of remote buckets. */ @Test - public void testReceiveUpdateRemoteBuckets(){ - - BucketStore store = createStore(); + public void testReceiveUpdateRemoteBuckets() throws Exception { + final ActorRef store = createStore(); Address localAddress = system.provider().getDefaultAddress(); - Bucket localBucket = new BucketImpl(); + Bucket localBucket = new BucketImpl<>(0L, new T()); Address a1 = new Address("tcp", "system1"); Address a2 = new Address("tcp", "system2"); Address a3 = new Address("tcp", "system3"); - Bucket b1 = new BucketImpl(); - Bucket b2 = new BucketImpl(); - Bucket b3 = new BucketImpl(); + Bucket b1 = new BucketImpl<>(0L, new T()); + Bucket b2 = new BucketImpl<>(0L, new T()); + Bucket b3 = new BucketImpl<>(0L, new T()); - Map remoteBuckets = new HashMap<>(3); + Map> remoteBuckets = new HashMap<>(3); remoteBuckets.put(a1, b1); remoteBuckets.put(a2, b2); remoteBuckets.put(a3, b3); remoteBuckets.put(localAddress, localBucket); + Await.result(Patterns.ask(store, new WaitUntilDonePersisting(), + Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf()); + //Given remote buckets - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); - //Should NOT contain local bucket - //Should contain ONLY 3 entries i.e a1, a2, a3 - Map> remoteBucketsInStore = store.getRemoteBuckets(); - Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); - Assert.assertTrue(remoteBucketsInStore.size() == 3); + //Should contain local bucket + //Should contain 4 entries i.e a1, a2, a3, local + Map> remoteBucketsInStore = getBuckets(store); + Assert.assertTrue(remoteBucketsInStore.size() == 4); //Add a new remote bucket Address a4 = new Address("tcp", "system4"); - Bucket b4 = new BucketImpl(); + Bucket b4 = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); remoteBuckets.put(a4, b4); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should contain a4 - //Should contain 4 entries now i.e a1, a2, a3, a4 - remoteBucketsInStore = store.getRemoteBuckets(); + //Should contain 5 entries now i.e a1, a2, a3, a4, local + remoteBucketsInStore = getBuckets(store); Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4)); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Update a bucket - Bucket b3_new = new BucketImpl(); + Bucket b3New = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); - remoteBuckets.put(a3, b3_new); + remoteBuckets.put(a3, b3New); remoteBuckets.put(a1, null); remoteBuckets.put(a2, null); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should only update a3 - remoteBucketsInStore = store.getRemoteBuckets(); - Bucket b3_inStore = remoteBucketsInStore.get(a3); - Assert.assertEquals(b3_new.getVersion(), b3_inStore.getVersion()); + remoteBucketsInStore = getBuckets(store); + Bucket b3InStore = remoteBucketsInStore.get(a3); + Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion()); //Should NOT update a1 and a2 - Bucket b1_inStore = remoteBucketsInStore.get(a1); - Bucket b2_inStore = remoteBucketsInStore.get(a2); - Assert.assertEquals(b1.getVersion(), b1_inStore.getVersion()); - Assert.assertEquals(b2.getVersion(), b2_inStore.getVersion()); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Bucket b1InStore = remoteBucketsInStore.get(a1); + Bucket b2InStore = remoteBucketsInStore.get(a2); + Assert.assertEquals(b1.getVersion(), b1InStore.getVersion()); + Assert.assertEquals(b2.getVersion(), b2InStore.getVersion()); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Should update versions map //versions map contains versions for all remote buckets (4). - Map versionsInStore = store.getVersions(); + Map versionsInStore = getVersions(store); Assert.assertEquals(4, versionsInStore.size()); - Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1)); - Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2)); - Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3)); - Assert.assertEquals(b4.getVersion(), versionsInStore.get(a4)); + Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1)); + Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2)); + Assert.assertEquals((Long)b3New.getVersion(), versionsInStore.get(a3)); + Assert.assertEquals((Long)b4.getVersion(), versionsInStore.get(a4)); //Send older version of bucket remoteBuckets.clear(); remoteBuckets.put(a3, b3); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should NOT update a3 - remoteBucketsInStore = store.getRemoteBuckets(); - b3_inStore = remoteBucketsInStore.get(a3); - Assert.assertTrue(b3_inStore.getVersion().longValue() == b3_new.getVersion().longValue()); - + remoteBucketsInStore = getBuckets(store); + b3InStore = remoteBucketsInStore.get(a3); + Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion()); } /** @@ -130,9 +174,63 @@ public class BucketStoreTest { * * @return instance of BucketStore class */ - private static BucketStore createStore(){ - final Props props = Props.create(BucketStore.class); - final TestActorRef testRef = TestActorRef.create(system, props, "testStore"); - return testRef.underlyingActor(); + private ActorRef createStore() { + return kit.childActorOf(Props.create(TestingBucketStore.class, + new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T())); + } + + @SuppressWarnings("unchecked") + private static Map> getBuckets(final ActorRef store) throws Exception { + final GetAllBucketsReply result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf()); + return result.getBuckets(); + } + + @SuppressWarnings("unchecked") + private static Map getVersions(final ActorRef store) throws Exception { + return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions(); + } + + private static final class TestingBucketStore extends BucketStore { + + private final List toNotify = new ArrayList<>(); + + TestingBucketStore(final RemoteRpcProviderConfig config, + final String persistenceId, + final T initialData) { + super(config, persistenceId, initialData); + } + + @Override + protected void handleCommand(Object message) throws Exception { + if (message instanceof WaitUntilDonePersisting) { + handlePersistAsk(); + } else if (message instanceof SaveSnapshotSuccess) { + super.handleCommand(message); + handleSnapshotSuccess(); + } else { + super.handleCommand(message); + } + } + + private void handlePersistAsk() { + if (isPersisting()) { + toNotify.add(getSender()); + } else { + getSender().tell(new Success(null), noSender()); + } + } + + private void handleSnapshotSuccess() { + toNotify.forEach(ref -> ref.tell(new Success(null), noSender())); + } + } + + /** + * Message sent to the TestingBucketStore that replies with success once the actor is done persisting. + */ + private static final class WaitUntilDonePersisting { + } -} \ No newline at end of file +}