X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=3d28672c9fd08906e2fba0a063dbcf10e4326ced;hp=e3b82df1743e75c433cec193d54a2cbfbd696319;hb=9f17976f66bc0d3b58bcb96f325a241e34871d54;hpb=fe45ad923c8cf83d730cf4d576c310967afabdf3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e3b82df174..3d28672c9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -8,6 +8,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -24,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -87,25 +86,32 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); @Test public void testRegisterChangeListener() throws Exception { @@ -337,8 +343,8 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -377,12 +383,27 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRecovery() throws Exception { + public void testApplyStateWithCandidatePayload() throws Exception { - // Set up the InMemorySnapshotStore. + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + DataTreeCandidatePayload.create(candidate))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); - InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); - testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -391,6 +412,55 @@ public class ShardTest extends AbstractShardTest { InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( SerializationUtils.serializeNormalizedNode(root), Collections.emptyList(), 0, 1, -1, -1)); + return testStore; + } + + private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException { + source.validate(mod); + final DataTreeCandidate candidate = source.prepare(mod); + source.commit(candidate); + return DataTreeCandidatePayload.create(candidate); + } + + @Test + public void testDataTreeCandidateRecovery() throws Exception { + // Set up the InMemorySnapshotStore. + final DataTree source = setupInMemorySnapshotStore(); + + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Set up the InMemoryJournal. + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + + // Add some ModificationPayload entries + for (int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + + final DataTreeModification mod = source.takeSnapshot().newModification(); + mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + payloadForModification(source, mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testModicationRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + setupInMemorySnapshotStore(); // Set up the InMemoryJournal. @@ -418,7 +488,7 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -427,7 +497,6 @@ public class ShardTest extends AbstractShardTest { return new ModificationPayload(compMod); } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -439,23 +508,23 @@ public class ShardTest extends AbstractShardTest { // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -469,7 +538,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.class)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -484,11 +553,11 @@ public class ShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -595,18 +664,7 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + verifyOuterListEntry(shard, 1); verifyLastApplied(shard, 2); @@ -614,36 +672,36 @@ public class ShardTest extends AbstractShardTest { }}; } - private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, - NormalizedNode data, boolean ready) { - return newBatchedModifications(transactionID, null, path, data, ready); + private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + NormalizedNode data, boolean ready, boolean doCommitOnReady) { + return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } - private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, - YangInstanceIdentifier path, NormalizedNode data, boolean ready) { + private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); batched.setReady(ready); + batched.setDoCommitOnReady(doCommitOnReady); return batched; } - @SuppressWarnings("unchecked") @Test - public void testMultipleBatchedModifications() throws Throwable { + public void testBatchedModificationsWithNoCommitOnReady() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testMultipleBatchedModifications"); + "testBatchedModificationsWithNoCommitOnReady"); waitUntilLeader(shard); final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -657,18 +715,18 @@ public class ShardTest extends AbstractShardTest { // Send a BatchedModifications to start a transaction. shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); // Send a couple more BatchedModifications. shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef()); + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -690,23 +748,85 @@ public class ShardTest extends AbstractShardTest { // Verify data in the data store. - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + verifyOuterListEntry(shard, 1); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } + @Test + public void testBatchedModificationsWithCommitOnReady() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsWithCommitOnReady"); + + waitUntilLeader(shard); + + final String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); + + final AtomicReference mockCohort = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); + } + + return mockCohort.get(); + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send a BatchedModifications to start a transaction. + + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Send a couple more BatchedModifications. + + shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + InOrder inOrder = inOrder(mockCohort.get()); + inOrder.verify(mockCohort.get()).canCommit(); + inOrder.verify(mockCohort.get()).preCommit(); + inOrder.verify(mockCohort.get()).commit(); + + // Verify data in the data store. + + verifyOuterListEntry(shard, 1); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @SuppressWarnings("unchecked") + private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + Object entry = ((Iterable)outerList.getValue()).iterator().next(); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue()); + } + @Test public void testBatchedModificationsOnTransactionChain() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -727,7 +847,7 @@ public class ShardTest extends AbstractShardTest { ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, - containerNode, true), getRef()); + containerNode, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. @@ -765,6 +885,8 @@ public class ShardTest extends AbstractShardTest { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); new ShardTestKit(getSystem()) {{ Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), @@ -800,6 +922,45 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{ + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testForwardedReadyTransactionWithImmediateCommit"); + + waitUntilLeader(shard); + + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, + TestModel.TEST_PATH, containerNode, modification); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); + assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + @Test public void testCommitWithPersistenceDisabled() throws Throwable { dataStoreContextBuilder.persistent(false); @@ -810,14 +971,14 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); // Setup a simulated transactions with a mock cohort. String transactionID = "tx"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -826,7 +987,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -853,6 +1014,25 @@ public class ShardTest extends AbstractShardTest { }}; } + private static DataTreeCandidateTip mockCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); + doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + + private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + @Test public void testCommitWhenTransactionHasNoModifications(){ // Note that persistence is enabled which would normally result in the entry getting written to the journal @@ -867,10 +1047,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -878,7 +1059,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -922,10 +1103,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -933,7 +1115,7 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -973,19 +1155,20 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 simulated transactions with mock cohorts. The first one fails in the + // Setup 2 simulated transactions with mock cohorts. The first one fails in the // commit phase. String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -995,11 +1178,11 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1052,37 +1235,66 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); - doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); + final Timeout timeout = new Timeout(duration); // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - // Send the CanCommitTransaction message. + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + // Send the CanCommitTransaction message for the first Tx. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the CommitTransaction message. This should send back an error - // for preCommit failure. + // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and + // processed after the first Tx completes. - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + Future canCommitFuture = Patterns.ask(shard, + new CanCommitTransaction(transactionID2).toSerializable(), timeout); + + // Send the CommitTransaction message for the first Tx. This should send back an error + // and trigger the 2nd Tx to proceed. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); - InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); + // Wait for the 2nd Tx to complete the canCommit phase. + + final CountDownLatch latch = new CountDownLatch(1); + canCommitFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable t, final Object resp) { + latch.countDown(); + } + }, getSystem().dispatcher()); + + assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS)); + + InOrder inOrder = inOrder(cohort1, cohort2); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort2).canCommit(); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; @@ -1099,23 +1311,183 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - String transactionID = "tx1"; + String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, akka.actor.Status.Failure.class); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", true, reply.getCanCommit()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testCanCommitPhaseFalseResponse() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCanCommitPhaseFalseResponse"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID1 = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", false, reply.getCanCommit()); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, false), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + reply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("getCanCommit", true, reply.getCanCommit()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testImmediateCommitWithCanCommitPhaseFailure"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID1 = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); + doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + expectMsgClass(duration, akka.actor.Status.Failure.class); + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testImmediateCommitWithCanCommitPhaseFalseResponse"); + + waitUntilLeader(shard); + + final FiniteDuration duration = duration("5 seconds"); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, akka.actor.Status.Failure.class); + + // Send another can commit to ensure the failed one got cleaned up. + + reset(cohort); + + String transactionID2 = "tx2"; + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort, modification, true, true), getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @@ -1130,13 +1502,13 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; - Function> preCommit = - new Function>() { + Function> preCommit = + new Function>() { @Override - public ListenableFuture apply(final DOMStoreThreePhaseCommitCohort cohort) { + public ListenableFuture apply(final ShardDataTreeCohort cohort) { ListenableFuture preCommitFuture = cohort.preCommit(); // Simulate an AbortTransaction message occurring during replication, after @@ -1154,12 +1526,12 @@ public class ShardTest extends AbstractShardTest { }; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); + cohort, modification, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1194,7 +1566,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, @@ -1204,7 +1576,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -1216,7 +1588,7 @@ public class ShardTest extends AbstractShardTest { MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, listNodePath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), modification2); @@ -1224,11 +1596,11 @@ public class ShardTest extends AbstractShardTest { // Ready the Tx's shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1241,6 +1613,11 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + // Try to commit the 1st Tx - should fail as it's not the current Tx. + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, akka.actor.Status.Failure.class); + // Commit the 2nd Tx. shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef()); @@ -1266,37 +1643,37 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); // Ready the Tx's shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); + cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. @@ -1344,13 +1721,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1360,11 +1737,11 @@ public class ShardTest extends AbstractShardTest { // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); + cohort1, modification1, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); + cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1506,29 +1883,29 @@ public class ShardTest extends AbstractShardTest { /** * This test simply verifies that the applySnapShot logic will work * @throws ReadFailedException + * @throws DataValidationFailedException */ @Test - public void testInMemoryDataStoreRestore() throws ReadFailedException { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); + public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); - - DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - commitTransaction(putTransaction); + commitTransaction(store, putTransaction); - NormalizedNode expected = readStore(store); + NormalizedNode expected = readStore(store, YangInstanceIdentifier.builder().build()); - DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + DataTreeModification writeTransaction = store.takeSnapshot().newModification(); writeTransaction.delete(YangInstanceIdentifier.builder().build()); writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); - commitTransaction(writeTransaction); + commitTransaction(store, writeTransaction); - NormalizedNode actual = readStore(store); + NormalizedNode actual = readStore(store, YangInstanceIdentifier.builder().build()); assertEquals(expected, actual); } @@ -1637,15 +2014,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); - try { - future.get(); - future = commitCohort.commit(); - future.get(); - } catch (InterruptedException | ExecutionException e) { - } + private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException { + modification.ready(); + store.validate(modification); + store.commit(store.prepare(modification)); } }