X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fcompat%2FPreLithiumShardTest.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fcompat%2FPreLithiumShardTest.java;h=1cf45cef01e888504af95dbe4675dcb67d3f858a;hp=5fb02619e9b7e859e4fabc0d678d848aa79d82ae;hb=28b2fd303b8e8bc757de6ead454ae06469113b34;hpb=196fbf9b716ea26740195fd397c1b2550f656638 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 5fb02619e9..1cf45cef01 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -8,36 +8,16 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.inOrder; -import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.dispatch.Dispatchers; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Optional; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; -import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.ShardDataTree; -import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; @@ -55,16 +35,11 @@ import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -196,188 +171,4 @@ public class PreLithiumShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - - @SuppressWarnings({ "unchecked" }) - @Test - public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable { - new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreLithiumConcurrentThreePhaseCommits"); - - waitUntilLeader(shard); - - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); - - long timeoutSec = 5; - final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); - final Timeout timeout = new Timeout(duration); - - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. - - shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef()); - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); - - // Send the CanCommitTransaction message for the first Tx. - - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - // Send the ForwardedReadyTransaction for the next 2 Tx's. - - shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and - // processed after the first Tx completes. - - Future canCommitFuture1 = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); - - Future canCommitFuture2 = Patterns.ask(shard, - new CanCommitTransaction(transactionID3).toSerializable(), timeout); - - // Send the CommitTransaction message for the first Tx. After it completes, it should - // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. - - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - // Wait for the next 2 Tx's to complete. - - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch commitLatch = new CountDownLatch(2); - - class OnFutureComplete extends OnComplete { - private final Class expRespType; - - OnFutureComplete(final Class expRespType) { - this.expRespType = expRespType; - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - if(error != null) { - caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error)); - } else { - try { - assertEquals("Commit response type", expRespType, resp.getClass()); - onSuccess(resp); - } catch (Exception e) { - caughtEx.set(e); - } - } - } - - void onSuccess(final Object resp) throws Exception { - } - } - - class OnCommitFutureComplete extends OnFutureComplete { - OnCommitFutureComplete() { - super(CommitTransactionReply.SERIALIZABLE_CLASS); - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - super.onComplete(error, resp); - commitLatch.countDown(); - } - } - - class OnCanCommitFutureComplete extends OnFutureComplete { - private final String transactionID; - - OnCanCommitFutureComplete(final String transactionID) { - super(CanCommitTransactionReply.SERIALIZABLE_CLASS); - this.transactionID = transactionID; - } - - @Override - void onSuccess(final Object resp) throws Exception { - CanCommitTransactionReply canCommitReply = - CanCommitTransactionReply.fromSerializable(resp); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - Future commitFuture = Patterns.ask(shard, - new CommitTransaction(transactionID).toSerializable(), timeout); - commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); - } - } - - canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), - getSystem().dispatcher()); - - canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), - getSystem().dispatcher()); - - boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); - - if(caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Commits complete", true, done); - - InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); - - // Verify data in the data store. - - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - - verifyLastApplied(shard, 2); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } }