X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=e3b82df1743e75c433cec193d54a2cbfbd696319;hp=adc7f4706c1e871f6ddab28bf8364e06a975d0fc;hb=c31a6fcf9fb070d4419ca4c32d8b531fdcb5030d;hpb=a51d14246d418570ac98663d286292deefa3555a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index adc7f4706c..e3b82df174 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -16,9 +16,8 @@ import akka.actor.Props; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; -import akka.japi.Procedure; import akka.pattern.Patterns; -import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.SaveSnapshotSuccess; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -41,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -69,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -120,7 +122,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, - dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef()); + dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterChangeListenerReply.class); @@ -158,8 +160,12 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { + // Use a non persistent provider because this test actually invokes persist on the journal + // this will cause all other messages to not be queued properly after that. + // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when + // it does do a persist) return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -208,7 +214,7 @@ public class ShardTest extends AbstractShardTest { onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor.path(), + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), @@ -431,42 +437,42 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final String transactionID1 = "tx1"; - final String transactionID2 = "tx2"; - final String transactionID3 = "tx3"; + // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - final AtomicReference mockCohort1 = new AtomicReference<>(); - final AtomicReference mockCohort2 = new AtomicReference<>(); - final AtomicReference mockCohort3 = new AtomicReference<>(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) { - if(transactionID.equals(transactionID1)) { - mockCohort1.set(createDelegatingMockCohort("cohort1", actual)); - return mockCohort1.get(); - } else if(transactionID.equals(transactionID2)) { - mockCohort2.set(createDelegatingMockCohort("cohort2", actual)); - return mockCohort2.get(); - } else { - mockCohort3.set(createDelegatingMockCohort("cohort3", actual)); - return mockCohort3.get(); - } - } - }; + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification3); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Send a BatchedModifications message for the first transaction. + // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class); - assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath()); - assertEquals("getNumBatched", 1, batchedReply.getNumBatched()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( + expectMsgClass(duration, ReadyTransactionReply.class)); + assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); // Send the CanCommitTransaction message for the first Tx. @@ -475,16 +481,15 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send BatchedModifications for the next 2 Tx's. + // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. @@ -577,16 +582,16 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get()); - inOrder.verify(mockCohort1.get()).canCommit(); - inOrder.verify(mockCohort1.get()).preCommit(); - inOrder.verify(mockCohort1.get()).commit(); - inOrder.verify(mockCohort2.get()).canCommit(); - inOrder.verify(mockCohort2.get()).preCommit(); - inOrder.verify(mockCohort2.get()).commit(); - inOrder.verify(mockCohort3.get()).canCommit(); - inOrder.verify(mockCohort3.get()).preCommit(); - inOrder.verify(mockCohort3.get()).commit(); + InOrder inOrder = inOrder(cohort1, cohort2, cohort3); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort1).commit(); + inOrder.verify(cohort2).canCommit(); + inOrder.verify(cohort2).preCommit(); + inOrder.verify(cohort2).commit(); + inOrder.verify(cohort3).canCommit(); + inOrder.verify(cohort3).preCommit(); + inOrder.verify(cohort3).commit(); // Verify data in the data store. @@ -664,7 +669,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -723,7 +728,7 @@ public class ShardTest extends AbstractShardTest { YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. @@ -805,14 +810,24 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + + // Setup a simulated transactions with a mock cohort. + String transactionID = "tx"; + MutableCompositeModification modification = new MutableCompositeModification(); + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + TestModel.TEST_PATH, containerNode, modification); + FiniteDuration duration = duration("5 seconds"); - // Send a BatchedModifications to start a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -826,6 +841,11 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -859,7 +879,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -914,7 +934,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -933,7 +953,7 @@ public class ShardTest extends AbstractShardTest { // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); @@ -953,40 +973,34 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 mock cohorts. The first one fails in the commit phase. + // Setup 2 simulated transactions with mock cohorts. The first one fails in the + // commit phase. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Send BatchedModifications to start and ready each transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1039,27 +1053,19 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); - // Send BatchedModifications to start and ready a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1094,24 +1100,16 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - - // Send BatchedModifications to start and ready a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1155,9 +1153,14 @@ public class ShardTest extends AbstractShardTest { } }; - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), + modification, preCommit); + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( @@ -1191,26 +1194,42 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create and ready the 1st Tx - will timeout + // Create 1st Tx - will timeout String transactionID1 = "tx1"; - shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification1); - // Create and ready the 2nd Tx + // Create 2nd Tx - String transactionID2 = "tx2"; + String transactionID2 = "tx3"; + MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - shard.tell(newBatchedModifications(transactionID2, listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), + modification2); + + // Ready the Tx's + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1247,23 +1266,38 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - // Send a BatchedModifications to start transactions and ready them. + // Ready the Tx's - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. @@ -1308,37 +1342,30 @@ public class ShardTest extends AbstractShardTest { // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - // Send BatchedModifications to start and ready each transaction. - - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); - - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1384,68 +1411,52 @@ public class ShardTest extends AbstractShardTest { @SuppressWarnings("serial") public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ - final AtomicReference savedSnapshot = new AtomicReference<>(); - class DelegatingPersistentDataProvider implements DataPersistenceProvider { - DataPersistenceProvider delegate; - - DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { - this.delegate = delegate; - } + final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - @Override - public boolean isRecoveryApplicable() { - return delegate.isRecoveryApplicable(); - } - - @Override - public void persist(T o, Procedure procedure) { - delegate.persist(o, procedure); + final AtomicReference savedSnapshot = new AtomicReference<>(); + class TestPersistentDataProvider extends DelegatingPersistentDataProvider { + TestPersistentDataProvider(DataPersistenceProvider delegate) { + super(delegate); } @Override public void saveSnapshot(Object o) { savedSnapshot.set(o); - delegate.saveSnapshot(o); - } - - @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { - delegate.deleteSnapshots(criteria); - } - - @Override - public void deleteMessages(long sequenceNumber) { - delegate.deleteMessages(sequenceNumber); + super.saveSnapshot(o); } } dataStoreContextBuilder.persistent(persistent); new ShardTestKit(getSystem()) {{ - final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + class TestShard extends Shard { - DelegatingPersistentDataProvider delegating; + protected TestShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + super(name, peerAddresses, datastoreContext, schemaContext); + setPersistence(new TestPersistentDataProvider(super.persistence())); + } - @Override - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } + @Override + public void handleCommand(Object message) { + super.handleCommand(message); - return delegating; - } + if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) { + latch.get().countDown(); + } + } - @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); - } - }; + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + } + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new TestShard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); } }; @@ -1458,8 +1469,9 @@ public class ShardTest extends AbstractShardTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); - shard.tell(capture, getRef()); + // Trigger creation of a snapshot by ensuring + RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1471,7 +1483,7 @@ public class ShardTest extends AbstractShardTest { latch.set(new CountDownLatch(1)); savedSnapshot.set(null); - shard.tell(capture, getRef()); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1540,14 +1552,14 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard1 = TestActorRef.create(getSystem(), persistentProps, "testPersistence1"); - assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable()); shard1.tell(PoisonPill.getInstance(), ActorRef.noSender()); TestActorRef shard2 = TestActorRef.create(getSystem(), nonPersistentProps, "testPersistence2"); - assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable()); shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); @@ -1563,19 +1575,19 @@ public class ShardTest extends AbstractShardTest { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext"); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); waitUntilLeader(shard); shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", false, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender()); assertEquals("isRecoveryApplicable", true, - shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable()); + shard.underlyingActor().persistence().isRecoveryApplicable()); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }};