X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=6c517a464f5ed66b4304ed0d62fbbab7c35e9eff;hb=refs%2Fchanges%2F43%2F29943%2F2;hp=f097c19e512a3766a3d836c5cc7e061862b29666;hpb=68f0b724084bcf42d775ad1f6e40c02dfcedf3d3;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index f097c19e51..6c517a464f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -279,7 +279,7 @@ public class ShardTest extends AbstractShardTest { final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); - shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterDataTreeChangeListenerReply.class); @@ -347,7 +347,7 @@ public class ShardTest extends AbstractShardTest { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterDataTreeChangeListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); @@ -2517,146 +2517,150 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataChangeListernerDelayedRegistration() throws Exception { + public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - dataStoreContextBuilder.persistent(false); - final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); - final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - boolean firstElectionTimeout = true; - - @Override - public Shard create() throws Exception { - return new Shard(newShardBuilder()) { - @Override - public void onReceiveCommand(final Object message) throws Exception { - if(message instanceof ElectionTimeout && firstElectionTimeout) { - firstElectionTimeout = false; - final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); - - onFirstElectionTimeout.countDown(); - } else { - super.onReceiveCommand(message); - } - } - }; - } - }; + String testName = "testClusteredDataChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()). - withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower"); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - assertEquals("Got first ElectionTimeout", true, - onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeadeReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - onChangeListenerRegistered.countDown(); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); listener.waitForChangeEvents(); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test - public void testClusteredDataChangeListernerRegistration() throws Exception { - dataStoreContextBuilder.persistent(false).build(); + public void testClusteredDataChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); - - final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2") - .shardName("inventory").type("config").build(); - final Creator followerShardCreator = new Creator() { - private static final long serialVersionUID = 1L; + String testName = "testClusteredDataChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); - @Override - public Shard create() throws Exception { - return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()). - peerAddresses(Collections.singletonMap(member2ShardID.toString(), - "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) { - @Override - public void onReceiveCommand(final Object message) throws Exception { + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataChangeListener listener = new MockDataChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - if(!(message instanceof ElectionTimeout)) { - super.onReceiveCommand(message); - } - } - }; - } - }; + followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); + final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); - final Creator leaderShardCreator = new Creator() { - private static final long serialVersionUID = 1L; + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - @Override - public Shard create() throws Exception { - return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()). - peerAddresses(Collections.singletonMap(member1ShardID.toString(), - "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {}; - } - }; + listener.waitForChangeEvents(); + }}; + } + @Test + public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(followerShardCreator)), - member1ShardID.toString()); + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); - final TestActorRef shardLeader = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - member2ShardID.toString()); - // Sleep to let election happen - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeaderReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); + listener.waitForChangeEvents(); + }}; + } - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + @Test + public void testClusteredDataTreeChangeListenerRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); + + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + + followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); + + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); }}; }