X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractShardTest.java;h=9d313a5a7897b39991efa0fdbe4cec5d32980170;hp=1100f3a7fa2c0fd584dc57618a6cd54339aa4b60;hb=ffa332690d87fa2975fbc3acb16a8044b0d28125;hpb=107324809285bfbb9890cba38ffa18390f8de4bd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 1100f3a7fa..9d313a5a78 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -11,8 +11,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -23,7 +27,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -36,8 +39,11 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -50,7 +56,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -71,6 +76,8 @@ public abstract class AbstractShardTest extends AbstractActorTest{ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000). shardHeartbeatIntervalInMillis(100); + protected final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + @Before public void setUp() { InMemorySnapshotStore.clear(); @@ -81,6 +88,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ public void tearDown() { InMemorySnapshotStore.clear(); InMemoryJournal.clear(); + actorFactory.close(); } protected DatastoreContext newDatastoreContext() { @@ -88,23 +96,26 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } protected Props newShardProps() { - return Shard.props(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT); + return newShardBuilder().props(); + } + + protected Shard.Builder newShardBuilder() { + return Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).schemaContext(SCHEMA_CONTEXT); } - protected void testRecovery(Set listEntryKeys) throws Exception { + protected void testRecovery(final Set listEntryKeys) throws Exception { // Create the actor and wait for recovery complete. - int nListEntries = listEntryKeys.size(); + final int nListEntries = listEntryKeys.size(); final CountDownLatch recoveryComplete = new CountDownLatch(1); @SuppressWarnings("serial") + final Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + return new Shard(newShardBuilder()) { @Override protected void onRecoveryComplete() { try { @@ -117,25 +128,25 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } }; - TestActorRef shard = TestActorRef.create(getSystem(), + final TestActorRef shard = TestActorRef.create(getSystem(), Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), "testRecovery"); assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS)); // Verify data in the data store. - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + final NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", outerList.getValue() instanceof Iterable); - for(Object entry: (Iterable) outerList.getValue()) { + for(final Object entry: (Iterable) outerList.getValue()) { assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = + final MapEntryNode mapEntry = (MapEntryNode)entry; + final Optional> idLeaf = mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - Object value = idLeaf.get().getValue(); + final Object value = idLeaf.get().getValue(); assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value, listEntryKeys.remove(value)); } @@ -155,7 +166,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - protected void verifyLastApplied(TestActorRef shard, long expectedValue) { + protected void verifyLastApplied(final TestActorRef shard, final long expectedValue) { long lastApplied = -1; for(int i = 0; i < 20 * 5; i++) { lastApplied = shard.underlyingActor().getShardMBean().getLastApplied(); @@ -179,9 +190,9 @@ public abstract class AbstractShardTest extends AbstractActorTest{ final MutableCompositeModification modification, final Function> preCommit) { - ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); + final ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); tx.getSnapshot().write(path, data); - ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); + final ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); modification.addModification(new WriteModification(path, data)); @@ -196,7 +207,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, final ShardDataTreeCohort actual, final Function> preCommit) { - ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); + final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); doAnswer(new Answer>() { @Override @@ -240,18 +251,72 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return cohort; } - public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) - throws ExecutionException, InterruptedException { - return readStore(shard.underlyingActor().getDataStore().getDataTree(), id); + protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, + String transactionID, + MutableCompositeModification modification, + boolean doCommitOnReady) { + if(remoteReadWriteTransaction){ + return prepareForwardedReadyTransaction(cohort, transactionID, CURRENT_VERSION, + doCommitOnReady); + } else { + setupCohortDecorator(shard, cohort); + return prepareBatchedModifications(transactionID, modification, doCommitOnReady); + } } - public static NormalizedNode readStore(final DataTree store, final YangInstanceIdentifier id) { - DataTreeSnapshot transaction = store.takeSnapshot(); + static ShardDataTreeTransactionParent newShardDataTreeTransactionParent(ShardDataTreeCohort cohort) { + ShardDataTreeTransactionParent mockParent = mock(ShardDataTreeTransactionParent.class); + doReturn(cohort).when(mockParent).finishTransaction(any(ReadWriteShardDataTreeTransaction.class)); + doNothing().when(mockParent).abortTransaction(any(AbstractShardDataTreeTransaction.class)); + return mockParent; + } + + protected ForwardedReadyTransaction prepareForwardedReadyTransaction(ShardDataTreeCohort cohort, + String transactionID, short version, boolean doCommitOnReady) { + return new ForwardedReadyTransaction(transactionID, version, + new ReadWriteShardDataTreeTransaction(newShardDataTreeTransactionParent(cohort), transactionID, + mock(DataTreeModification.class)), true, doCommitOnReady); + } + + protected Object prepareReadyTransactionMessage(boolean remoteReadWriteTransaction, Shard shard, ShardDataTreeCohort cohort, + String transactionID, + MutableCompositeModification modification) { + return prepareReadyTransactionMessage(remoteReadWriteTransaction, shard, cohort, transactionID, modification, false); + } + + protected void setupCohortDecorator(Shard shard, final ShardDataTreeCohort cohort) { + shard.getCommitCoordinator().setCohortDecorator(new ShardCommitCoordinator.CohortDecorator() { + @Override + public ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual) { + return cohort; + } + }); + } + + protected BatchedModifications prepareBatchedModifications(String transactionID, + MutableCompositeModification modification) { + return prepareBatchedModifications(transactionID, modification, false); + } + + private static BatchedModifications prepareBatchedModifications(String transactionID, + MutableCompositeModification modification, + boolean doCommitOnReady) { + final BatchedModifications batchedModifications = new BatchedModifications(transactionID, CURRENT_VERSION, null); + batchedModifications.addModification(modification); + batchedModifications.setReady(true); + batchedModifications.setDoCommitOnReady(doCommitOnReady); + batchedModifications.setTotalMessagesSent(1); + return batchedModifications; + } - Optional> optional = transaction.readNode(id); - NormalizedNode node = optional.isPresent()? optional.get() : null; - return node; + public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) + throws ExecutionException, InterruptedException { + return shard.underlyingActor().getDataStore().readNode(id).orNull(); + } + + public static NormalizedNode readStore(final DataTree store, final YangInstanceIdentifier id) { + return store.takeSnapshot().readNode(id).orNull(); } public static void writeToStore(final TestActorRef shard, final YangInstanceIdentifier id, @@ -261,10 +326,21 @@ public abstract class AbstractShardTest extends AbstractActorTest{ public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id, final NormalizedNode node) throws InterruptedException, ExecutionException { - ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); + final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); transaction.getSnapshot().write(id, node); - ShardDataTreeCohort cohort = transaction.ready(); + final ShardDataTreeCohort cohort = transaction.ready(); + cohort.canCommit().get(); + cohort.preCommit().get(); + cohort.commit(); + } + + public static void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id, + final NormalizedNode node) throws InterruptedException, ExecutionException { + final ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); + + transaction.getSnapshot().merge(id, node); + final ShardDataTreeCohort cohort = transaction.ready(); cohort.canCommit().get(); cohort.preCommit().get(); cohort.commit(); @@ -272,7 +348,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ public static void writeToStore(final DataTree store, final YangInstanceIdentifier id, final NormalizedNode node) throws DataValidationFailedException { - DataTreeModification transaction = store.takeSnapshot().newModification(); + final DataTreeModification transaction = store.takeSnapshot().newModification(); transaction.write(id, node); transaction.ready();