X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=1ecf0971c10d3f0a429f3c56ea36d84eaa44befc;hp=3d28672c9fd08906e2fba0a063dbcf10e4326ced;hb=a2c4e27ea137ce9e2929916b2964116c4df188a0;hpb=ba3433c7bf94e551a4ea520c95165a9358bf9227 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 3d28672c9f..1ecf0971c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -57,6 +57,9 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -64,8 +67,8 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -80,8 +83,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -246,6 +251,110 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testRegisterDataTreeChangeListener() throws Exception { + new ShardTestKit(getSystem()) {{ + TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps(), "testRegisterDataTreeChangeListener"); + + waitUntilLeader(shard); + + shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testRegisterDataTreeChangeListener-DataTreeChangeListener"); + + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeChangeListenerReply.class); + String replyPath = reply.getListenerRegistrationPath().toString(); + assertTrue("Incorrect reply path: " + replyPath, replyPath.matches( + "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*")); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @SuppressWarnings("serial") + @Test + public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); + final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); + Creator creator = new Creator() { + boolean firstElectionTimeout = true; + + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof ElectionTimeout && firstElectionTimeout) { + firstElectionTimeout = false; + final ActorRef self = getSelf(); + new Thread() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + } + }.start(); + + onFirstElectionTimeout.countDown(); + } else { + super.onReceiveCommand(message); + } + } + }; + } + }; + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + assertEquals("Got first ElectionTimeout", true, + onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + + shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + + shard.tell(new FindLeader(), getRef()); + FindLeaderReply findLeadeReply = + expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + onChangeListenerRegistered.countDown(); + + // TODO: investigate why we do not receive data chage events + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCreateTransaction(){ new ShardTestKit(getSystem()) {{ @@ -1985,14 +2094,27 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterRoleChangeListener(), listener); - // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore - // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary - // sleep. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); - List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", true, + leaderStateChanged.getLocalShardDataTree().isPresent()); + assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), + leaderStateChanged.getLocalShardDataTree().get()); - assertEquals(1, allMatching.size()); + MessageCollectorActor.clearMessages(listener); + + // Force a leader change + + shard.tell(new RequestVote(10000, "member2", 50, 50), getRef()); + + leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", false, + leaderStateChanged.getLocalShardDataTree().isPresent()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; }