X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreTest.java;h=59dd29b3142f5c8f80ad81dce17fa859c382eb57;hb=d04b71990a802071a786fe8f0df57bc4adbdec3f;hp=4e3961aac139f653a8886bf5eb18937891df21c9;hpb=e9fce74e37472296faa2faf1acbd110b74196032;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index 4e3961aac1..59dd29b314 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,22 +7,39 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static akka.actor.ActorRef.noSender; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; +import akka.actor.Status.Success; +import akka.pattern.Patterns; +import akka.persistence.SaveSnapshotSuccess; import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.typesafe.config.ConfigFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.TerminationMonitor; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; public class BucketStoreTest { @@ -41,6 +58,8 @@ public class BucketStoreTest { private static ActorSystem system; + private JavaTestKit kit; + @BeforeClass public static void setup() { system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test")); @@ -52,24 +71,33 @@ public class BucketStoreTest { JavaTestKit.shutdownActorSystem(system); } + @Before + public void before() { + kit = new JavaTestKit(system); + } + + @After + public void after() { + kit.shutdown(system); + } + /** * Given remote buckets, should merge with local copy of remote buckets. */ @Test - public void testReceiveUpdateRemoteBuckets() { - - final BucketStore store = createStore(); + public void testReceiveUpdateRemoteBuckets() throws Exception { + final ActorRef store = createStore(); Address localAddress = system.provider().getDefaultAddress(); - Bucket localBucket = new BucketImpl<>(new T()); + Bucket localBucket = new BucketImpl<>(0L, new T()); Address a1 = new Address("tcp", "system1"); Address a2 = new Address("tcp", "system2"); Address a3 = new Address("tcp", "system3"); - Bucket b1 = new BucketImpl<>(new T()); - Bucket b2 = new BucketImpl<>(new T()); - Bucket b3 = new BucketImpl<>(new T()); + Bucket b1 = new BucketImpl<>(0L, new T()); + Bucket b2 = new BucketImpl<>(0L, new T()); + Bucket b3 = new BucketImpl<>(0L, new T()); Map> remoteBuckets = new HashMap<>(3); remoteBuckets.put(a1, b1); @@ -77,38 +105,40 @@ public class BucketStoreTest { remoteBuckets.put(a3, b3); remoteBuckets.put(localAddress, localBucket); + Await.result(Patterns.ask(store, new WaitUntilDonePersisting(), + Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf()); + //Given remote buckets - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); - //Should NOT contain local bucket - //Should contain ONLY 3 entries i.e a1, a2, a3 - Map> remoteBucketsInStore = store.getRemoteBuckets(); - Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress)); - Assert.assertTrue(remoteBucketsInStore.size() == 3); + //Should contain local bucket + //Should contain 4 entries i.e a1, a2, a3, local + Map> remoteBucketsInStore = getBuckets(store); + Assert.assertTrue(remoteBucketsInStore.size() == 4); //Add a new remote bucket Address a4 = new Address("tcp", "system4"); - Bucket b4 = new BucketImpl<>(new T()); + Bucket b4 = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); remoteBuckets.put(a4, b4); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should contain a4 - //Should contain 4 entries now i.e a1, a2, a3, a4 - remoteBucketsInStore = store.getRemoteBuckets(); + //Should contain 5 entries now i.e a1, a2, a3, a4, local + remoteBucketsInStore = getBuckets(store); Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4)); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Update a bucket - Bucket b3New = new BucketImpl<>(new T()); + Bucket b3New = new BucketImpl<>(0L, new T()); remoteBuckets.clear(); remoteBuckets.put(a3, b3New); remoteBuckets.put(a1, null); remoteBuckets.put(a2, null); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should only update a3 - remoteBucketsInStore = store.getRemoteBuckets(); + remoteBucketsInStore = getBuckets(store); Bucket b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion()); @@ -117,11 +147,11 @@ public class BucketStoreTest { Bucket b2InStore = remoteBucketsInStore.get(a2); Assert.assertEquals(b1.getVersion(), b1InStore.getVersion()); Assert.assertEquals(b2.getVersion(), b2InStore.getVersion()); - Assert.assertTrue(remoteBucketsInStore.size() == 4); + Assert.assertTrue(remoteBucketsInStore.size() == 5); //Should update versions map //versions map contains versions for all remote buckets (4). - Map versionsInStore = store.getVersions(); + Map versionsInStore = getVersions(store); Assert.assertEquals(4, versionsInStore.size()); Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1)); Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2)); @@ -131,13 +161,12 @@ public class BucketStoreTest { //Send older version of bucket remoteBuckets.clear(); remoteBuckets.put(a3, b3); - store.receiveUpdateRemoteBuckets(remoteBuckets); + store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender()); //Should NOT update a3 - remoteBucketsInStore = store.getRemoteBuckets(); + remoteBucketsInStore = getBuckets(store); b3InStore = remoteBucketsInStore.get(a3); Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion()); - } /** @@ -145,10 +174,63 @@ public class BucketStoreTest { * * @return instance of BucketStore class */ - private static BucketStore createStore() { - final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()), - new T()); - final TestActorRef> testRef = TestActorRef.create(system, props, "testStore"); - return testRef.underlyingActor(); + private ActorRef createStore() { + return kit.childActorOf(Props.create(TestingBucketStore.class, + new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T())); + } + + @SuppressWarnings("unchecked") + private static Map> getBuckets(final ActorRef store) throws Exception { + final GetAllBucketsReply result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf()); + return result.getBuckets(); + } + + @SuppressWarnings("unchecked") + private static Map getVersions(final ActorRef store) throws Exception { + return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(), + Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions(); + } + + private static final class TestingBucketStore extends BucketStore { + + private final List toNotify = new ArrayList<>(); + + TestingBucketStore(final RemoteRpcProviderConfig config, + final String persistenceId, + final T initialData) { + super(config, persistenceId, initialData); + } + + @Override + protected void handleCommand(Object message) throws Exception { + if (message instanceof WaitUntilDonePersisting) { + handlePersistAsk(); + } else if (message instanceof SaveSnapshotSuccess) { + super.handleCommand(message); + handleSnapshotSuccess(); + } else { + super.handleCommand(message); + } + } + + private void handlePersistAsk() { + if (isPersisting()) { + toNotify.add(getSender()); + } else { + getSender().tell(new Success(null), noSender()); + } + } + + private void handleSnapshotSuccess() { + toNotify.forEach(ref -> ref.tell(new Success(null), noSender())); + } + } + + /** + * Message sent to the TestingBucketStore that replies with success once the actor is done persisting. + */ + private static final class WaitUntilDonePersisting { + } }