X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=3d28672c9fd08906e2fba0a063dbcf10e4326ced;hp=72f672794ab16c9bf89920bcf855cb72fd4bcc63;hb=9f17976f66bc0d3b58bcb96f325a241e34871d54;hpb=228af4aa1ef1a802fd24e7e010f3bba959ee03dd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 72f672794a..3d28672c9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -88,25 +86,32 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); @Test public void testRegisterChangeListener() throws Exception { @@ -338,8 +343,8 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplySnapshot"); - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -378,12 +383,27 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRecovery() throws Exception { + public void testApplyStateWithCandidatePayload() throws Exception { - // Set up the InMemorySnapshotStore. + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + DataTreeCandidatePayload.create(candidate))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } - InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null); - testStore.onGlobalContextUpdated(SCHEMA_CONTEXT); + DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -392,6 +412,55 @@ public class ShardTest extends AbstractShardTest { InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( SerializationUtils.serializeNormalizedNode(root), Collections.emptyList(), 0, 1, -1, -1)); + return testStore; + } + + private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException { + source.validate(mod); + final DataTreeCandidate candidate = source.prepare(mod); + source.commit(candidate); + return DataTreeCandidatePayload.create(candidate); + } + + @Test + public void testDataTreeCandidateRecovery() throws Exception { + // Set up the InMemorySnapshotStore. + final DataTree source = setupInMemorySnapshotStore(); + + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Set up the InMemoryJournal. + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + + // Add some ModificationPayload entries + for (int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + + final DataTreeModification mod = source.takeSnapshot().newModification(); + mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + payloadForModification(source, mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testModicationRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + setupInMemorySnapshotStore(); // Set up the InMemoryJournal. @@ -419,7 +488,7 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -428,7 +497,6 @@ public class ShardTest extends AbstractShardTest { return new ModificationPayload(compMod); } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -440,23 +508,23 @@ public class ShardTest extends AbstractShardTest { // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -604,12 +672,12 @@ public class ShardTest extends AbstractShardTest { }}; } - private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } - private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); @@ -630,10 +698,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -698,10 +766,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -744,7 +812,7 @@ public class ShardTest extends AbstractShardTest { } @SuppressWarnings("unchecked") - private void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", @@ -817,6 +885,8 @@ public class ShardTest extends AbstractShardTest { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); new ShardTestKit(getSystem()) {{ Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), @@ -861,12 +931,12 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -901,14 +971,14 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); // Setup a simulated transactions with a mock cohort. String transactionID = "tx"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -944,6 +1014,25 @@ public class ShardTest extends AbstractShardTest { }}; } + private static DataTreeCandidateTip mockCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); + doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + + private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + @Test public void testCommitWhenTransactionHasNoModifications(){ // Note that persistence is enabled which would normally result in the entry getting written to the journal @@ -958,10 +1047,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1013,10 +1103,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1069,14 +1160,15 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1145,13 +1237,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1221,7 +1313,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1269,7 +1361,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1319,7 +1411,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1338,6 +1430,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1361,7 +1458,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1380,6 +1477,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1400,13 +1502,13 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; - Function> preCommit = - new Function>() { + Function> preCommit = + new Function>() { @Override - public ListenableFuture apply(final DOMStoreThreePhaseCommitCohort cohort) { + public ListenableFuture apply(final ShardDataTreeCohort cohort) { ListenableFuture preCommitFuture = cohort.preCommit(); // Simulate an AbortTransaction message occurring during replication, after @@ -1424,7 +1526,7 @@ public class ShardTest extends AbstractShardTest { }; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); @@ -1464,7 +1566,7 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, @@ -1474,7 +1576,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -1486,7 +1588,7 @@ public class ShardTest extends AbstractShardTest { MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, listNodePath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), modification2); @@ -1541,23 +1643,23 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); // Ready the Tx's @@ -1619,13 +1721,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1781,29 +1883,29 @@ public class ShardTest extends AbstractShardTest { /** * This test simply verifies that the applySnapShot logic will work * @throws ReadFailedException + * @throws DataValidationFailedException */ @Test - public void testInMemoryDataStoreRestore() throws ReadFailedException { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); - - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); - DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - commitTransaction(putTransaction); + commitTransaction(store, putTransaction); - NormalizedNode expected = readStore(store); + NormalizedNode expected = readStore(store, YangInstanceIdentifier.builder().build()); - DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + DataTreeModification writeTransaction = store.takeSnapshot().newModification(); writeTransaction.delete(YangInstanceIdentifier.builder().build()); writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); - commitTransaction(writeTransaction); + commitTransaction(store, writeTransaction); - NormalizedNode actual = readStore(store); + NormalizedNode actual = readStore(store, YangInstanceIdentifier.builder().build()); assertEquals(expected, actual); } @@ -1912,15 +2014,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); - try { - future.get(); - future = commitCohort.commit(); - future.get(); - } catch (InterruptedException | ExecutionException e) { - } + private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException { + modification.ready(); + store.validate(modification); + store.commit(store.prepare(modification)); } }