X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=49cfcbc9c257a31a5e7ad51585ebda185d8f9fd5;hb=103ceecd0195cca6c87fbd62a687d8addf128784;hp=3d28672c9fd08906e2fba0a063dbcf10e4326ced;hpb=2b2517144e4eb9c17d9b41e9d9ec20d0264f5e12;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 3d28672c9f..49cfcbc9c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -29,7 +30,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -41,6 +41,7 @@ import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -54,9 +55,13 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -64,8 +69,8 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -80,8 +85,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -94,6 +101,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -246,6 +254,110 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testRegisterDataTreeChangeListener() throws Exception { + new ShardTestKit(getSystem()) {{ + TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps(), "testRegisterDataTreeChangeListener"); + + waitUntilLeader(shard); + + shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testRegisterDataTreeChangeListener-DataTreeChangeListener"); + + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeChangeListenerReply.class); + String replyPath = reply.getListenerRegistrationPath().toString(); + assertTrue("Incorrect reply path: " + replyPath, replyPath.matches( + "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*")); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @SuppressWarnings("serial") + @Test + public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); + final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); + Creator creator = new Creator() { + boolean firstElectionTimeout = true; + + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof ElectionTimeout && firstElectionTimeout) { + firstElectionTimeout = false; + final ActorRef self = getSelf(); + new Thread() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + } + }.start(); + + onFirstElectionTimeout.countDown(); + } else { + super.onReceiveCommand(message); + } + } + }; + } + }; + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + assertEquals("Got first ElectionTimeout", true, + onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + + shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + + shard.tell(new FindLeader(), getRef()); + FindLeaderReply findLeadeReply = + expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + onChangeListenerRegistered.countDown(); + + // TODO: investigate why we do not receive data chage events + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCreateTransaction(){ new ShardTestKit(getSystem()) {{ @@ -256,7 +368,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shard.tell(new CreateTransaction("txn-1", - TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); + TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); @@ -277,7 +389,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); shard.tell(new CreateTransaction("txn-1", - TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), + TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), getRef()); CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), @@ -852,7 +964,7 @@ public class ShardTest extends AbstractShardTest { // Create a read Tx on the same chain. - shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() , + shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() , transactionChainID).toSerializable(), getRef()); CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); @@ -961,6 +1073,82 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testReadyLocalTransactionWithImmediateCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testReadyLocalTransactionWithImmediateCommit"); + + waitUntilLeader(shard); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + + ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); + MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification); + + String txId = "tx1"; + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + + shard.tell(readyMessage, getRef()); + + expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testReadyLocalTransactionWithThreePhaseCommit"); + + waitUntilLeader(shard); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + + ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); + MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification); + + String txId = "tx1"; + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false); + + shard.tell(readyMessage, getRef()); + + expectMsgClass(ReadyTransactionReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + // Send the CanCommitTransaction message. + + shard.tell(new CommitTransaction(txId).toSerializable(), getRef()); + expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCommitWithPersistenceDisabled() throws Throwable { dataStoreContextBuilder.persistent(false); @@ -1077,12 +1265,15 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort).preCommit(); inOrder.verify(cohort).commit(); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1,shardStats.getCommittedTransactionsCount()); // Commit index should not advance because this does not go into the journal - assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals(-1, shardStats.getCommitIndex()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1133,12 +1324,15 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort).preCommit(); inOrder.verify(cohort).commit(); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shardStats.getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification - assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals(0, shardStats.getCommitIndex()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1632,7 +1826,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitQueueCapacityExceeded() throws Throwable { - dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1); + dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), @@ -1672,9 +1866,11 @@ public class ShardTest extends AbstractShardTest { cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); + // The 3rd Tx should exceed queue capacity and fail. + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, cohort3, modification3, true, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); + expectMsgClass(duration, akka.actor.Status.Failure.class); // canCommit 1st Tx. @@ -1694,6 +1890,125 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithPriorExpiredCohortEntries"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); + + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // All Tx's are readied. We'll send canCommit for the last one but not the others. The others + // should expire from the queue and the last one should be processed. + + shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithSubsequentExpiredCohortEntry"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // CanCommit the first one so it's the current in-progress CohortEntry. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Ready the second Tx. + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Ready the third Tx. + + String transactionID3 = "tx3"; + DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification(); + new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)) + .apply(modification3); + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true); + + shard.tell(readyMessage, getRef()); + + // Commit the first Tx. After completing, the second should expire from the queue and the third + // Tx committed. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Expect commit reply from the third Tx. + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); + assertNotNull(TestModel.TEST2_PATH + " not found", node); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCanCommitBeforeReadyFailure() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -1985,14 +2300,27 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterRoleChangeListener(), listener); - // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore - // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary - // sleep. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); + + ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", true, + leaderStateChanged.getLocalShardDataTree().isPresent()); + assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), + leaderStateChanged.getLocalShardDataTree().get()); + + MessageCollectorActor.clearMessages(listener); - List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + // Force a leader change - assertEquals(1, allMatching.size()); + shard.tell(new RequestVote(10000, "member2", 50, 50), getRef()); + + leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", false, + leaderStateChanged.getLocalShardDataTree().isPresent()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; }