X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=70869ee68a55644cf7aa8c5103031a72a881d703;hb=376df7be9839100e31e6916d6e685dda9f8bd030;hp=3d28672c9fd08906e2fba0a063dbcf10e4326ced;hpb=ba3433c7bf94e551a4ea520c95165a9358bf9227;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 3d28672c9f..70869ee68a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -14,6 +15,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; @@ -29,7 +31,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -41,6 +42,7 @@ import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -54,9 +56,13 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -64,8 +70,8 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; @@ -80,8 +86,10 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; @@ -94,6 +102,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -113,6 +122,31 @@ import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); + private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"; + + final CountDownLatch recoveryComplete = new CountDownLatch(1); + + protected Props newShardPropsWithRecoveryComplete() { + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT) { + @Override + protected void onRecoveryComplete() { + try { + super.onRecoveryComplete(); + } finally { + recoveryComplete.countDown(); + } + } + }; + } + }; + return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); + } + @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -246,6 +280,110 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testRegisterDataTreeChangeListener() throws Exception { + new ShardTestKit(getSystem()) {{ + TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps(), "testRegisterDataTreeChangeListener"); + + waitUntilLeader(shard); + + shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender()); + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testRegisterDataTreeChangeListener-DataTreeChangeListener"); + + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), + RegisterDataTreeChangeListenerReply.class); + String replyPath = reply.getListenerRegistrationPath().toString(); + assertTrue("Incorrect reply path: " + replyPath, replyPath.matches( + "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*")); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @SuppressWarnings("serial") + @Test + public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); + final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); + Creator creator = new Creator() { + boolean firstElectionTimeout = true; + + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof ElectionTimeout && firstElectionTimeout) { + firstElectionTimeout = false; + final ActorRef self = getSelf(); + new Thread() { + @Override + public void run() { + Uninterruptibles.awaitUninterruptibly( + onChangeListenerRegistered, 5, TimeUnit.SECONDS); + self.tell(message, self); + } + }.start(); + + onFirstElectionTimeout.countDown(); + } else { + super.onReceiveCommand(message); + } + } + }; + } + }; + + MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener"); + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), + "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration"); + + YangInstanceIdentifier path = TestModel.TEST_PATH; + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + assertEquals("Got first ElectionTimeout", true, + onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); + + shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + + shard.tell(new FindLeader(), getRef()); + FindLeaderReply findLeadeReply = + expectMsgClass(duration("5 seconds"), FindLeaderReply.class); + assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + + writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + onChangeListenerRegistered.countDown(); + + // TODO: investigate why we do not receive data chage events + listener.waitForChangeEvents(); + + dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCreateTransaction(){ new ShardTestKit(getSystem()) {{ @@ -256,7 +394,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shard.tell(new CreateTransaction("txn-1", - TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); + TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); @@ -277,7 +415,7 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); shard.tell(new CreateTransaction("txn-1", - TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), + TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), getRef()); CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), @@ -385,7 +523,9 @@ public class ShardTest extends AbstractShardTest { @Test public void testApplyStateWithCandidatePayload() throws Exception { - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + TestActorRef shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState"); + + recoveryComplete.await(5, TimeUnit.SECONDS); NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); @@ -430,8 +570,10 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification writeMod = source.takeSnapshot().newModification(); writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); + // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); int nListEntries = 16; Set listEntryKeys = new HashSet<>(); @@ -446,11 +588,11 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification mod = source.takeSnapshot().newModification(); mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, payloadForModification(source, mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyJournalEntries(nListEntries)); testRecovery(listEntryKeys); @@ -464,7 +606,9 @@ public class ShardTest extends AbstractShardTest { // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload( + InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); + + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -478,11 +622,11 @@ public class ShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1, newModificationPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyJournalEntries(nListEntries)); testRecovery(listEntryKeys); @@ -673,16 +817,18 @@ public class ShardTest extends AbstractShardTest { } private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, - NormalizedNode data, boolean ready, boolean doCommitOnReady) { - return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); + NormalizedNode data, boolean ready, boolean doCommitOnReady, int messagesSent) { + return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent); } private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, - YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { + YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady, + int messagesSent) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); batched.setReady(ready); batched.setDoCommitOnReady(doCommitOnReady); + batched.setTotalMessagesSent(messagesSent); return batched; } @@ -715,18 +861,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -783,18 +929,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); @@ -811,6 +957,32 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test(expected=IllegalStateException.class) + public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsReadyWithIncorrectTotalMessageCount"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.setTotalMessagesSent(2); + + shard.tell(batched, getRef()); + + Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + if(failure != null) { + throw failure.cause(); + } + }}; + } + @SuppressWarnings("unchecked") private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); @@ -847,12 +1019,12 @@ public class ShardTest extends AbstractShardTest { ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, - containerNode, true, false), getRef()); + containerNode, true, false, 1), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. - shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() , + shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() , transactionChainID).toSerializable(), getRef()); CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); @@ -961,6 +1133,82 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testReadyLocalTransactionWithImmediateCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testReadyLocalTransactionWithImmediateCommit"); + + waitUntilLeader(shard); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + + ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); + MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification); + + String txId = "tx1"; + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true); + + shard.tell(readyMessage, getRef()); + + expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testReadyLocalTransactionWithThreePhaseCommit"); + + waitUntilLeader(shard); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification(); + + ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + new WriteModification(TestModel.TEST_PATH, writeData).apply(modification); + MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification); + + String txId = "tx1"; + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false); + + shard.tell(readyMessage, getRef()); + + expectMsgClass(ReadyTransactionReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + // Send the CanCommitTransaction message. + + shard.tell(new CommitTransaction(txId).toSerializable(), getRef()); + expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); + assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCommitWithPersistenceDisabled() throws Throwable { dataStoreContextBuilder.persistent(false); @@ -1077,12 +1325,15 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort).preCommit(); inOrder.verify(cohort).commit(); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1,shardStats.getCommittedTransactionsCount()); // Commit index should not advance because this does not go into the journal - assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals(-1, shardStats.getCommitIndex()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1133,12 +1384,15 @@ public class ShardTest extends AbstractShardTest { inOrder.verify(cohort).preCommit(); inOrder.verify(cohort).commit(); + shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef()); + ShardStats shardStats = expectMsgClass(duration, ShardStats.class); + // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shardStats.getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification - assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); + assertEquals(0, shardStats.getCommitIndex()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1632,7 +1886,7 @@ public class ShardTest extends AbstractShardTest { @Test public void testTransactionCommitQueueCapacityExceeded() throws Throwable { - dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1); + dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2); new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), @@ -1672,9 +1926,11 @@ public class ShardTest extends AbstractShardTest { cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); + // The 3rd Tx should exceed queue capacity and fail. + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, cohort3, modification3, true, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.class); + expectMsgClass(duration, akka.actor.Status.Failure.class); // canCommit 1st Tx. @@ -1694,6 +1950,125 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithPriorExpiredCohortEntries"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3); + + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // All Tx's are readied. We'll send canCommit for the last one but not the others. The others + // should expire from the queue and the last one should be processed. + + shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable { + dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1); + + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testTransactionCommitWithSubsequentExpiredCohortEntry"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // CanCommit the first one so it's the current in-progress CohortEntry. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Ready the second Tx. + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Ready the third Tx. + + String transactionID3 = "tx3"; + DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification(); + new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)) + .apply(modification3); + ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true); + + shard.tell(readyMessage, getRef()); + + // Commit the first Tx. After completing, the second should expire from the queue and the third + // Tx committed. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Expect commit reply from the third Tx. + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); + assertNotNull(TestModel.TEST2_PATH + " not found", node); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCanCommitBeforeReadyFailure() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -1985,14 +2360,27 @@ public class ShardTest extends AbstractShardTest { shard.tell(new RegisterRoleChangeListener(), listener); - // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore - // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary - // sleep. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class); + + ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", true, + leaderStateChanged.getLocalShardDataTree().isPresent()); + assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), + leaderStateChanged.getLocalShardDataTree().get()); - List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + MessageCollectorActor.clearMessages(listener); - assertEquals(1, allMatching.size()); + // Force a leader change + + shard.tell(new RequestVote(10000, "member2", 50, 50), getRef()); + + leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, + ShardLeaderStateChanged.class); + assertEquals("getLocalShardDataTree present", false, + leaderStateChanged.getLocalShardDataTree().isPresent()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }; }