X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fcompat%2FPreLithiumShardTest.java;h=c9797b5414589018b83baeffeb307b5fec81585f;hp=a2309be48f4e09f5b102418611bf7d035943a589;hb=7232cedfe7b702305b51da43d7302b7cc1014d7e;hpb=9f17976f66bc0d3b58bcb96f325a241e34871d54 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index a2309be48f..c9797b5414 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -8,41 +8,19 @@ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.inOrder; -import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; -import akka.dispatch.Dispatchers; -import akka.dispatch.OnComplete; -import akka.pattern.Patterns; import akka.testkit.TestActorRef; -import akka.util.Timeout; -import com.google.common.base.Optional; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; -import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.Shard; -import org.opendaylight.controller.cluster.datastore.ShardDataTree; -import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; @@ -50,7 +28,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; @@ -59,15 +36,11 @@ import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -76,8 +49,8 @@ import scala.concurrent.duration.FiniteDuration; */ public class PreLithiumShardTest extends AbstractShardTest { - private CompositeModificationPayload newLegacyPayload(final Modification... mods) { - MutableCompositeModification compMod = new MutableCompositeModification(); + private static CompositeModificationPayload newLegacyPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(DataStoreVersions.HELIUM_2_VERSION); for(Modification mod: mods) { compMod.addModification(mod); } @@ -85,8 +58,8 @@ public class PreLithiumShardTest extends AbstractShardTest { return new CompositeModificationPayload(compMod.toSerializable()); } - private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { - MutableCompositeModification compMod = new MutableCompositeModification(); + private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { + MutableCompositeModification compMod = new MutableCompositeModification(DataStoreVersions.HELIUM_2_VERSION); for(Modification mod: mods) { compMod.addModification(mod); } @@ -94,15 +67,6 @@ public class PreLithiumShardTest extends AbstractShardTest { return new CompositeModificationByteStringPayload(compMod.toSerializable()); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { - MutableCompositeModification compMod = new MutableCompositeModification(); - for(Modification mod: mods) { - compMod.addModification(mod); - } - - return new ModificationPayload(compMod); - } - @Test public void testApplyHelium2VersionSnapshot() throws Exception { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), @@ -110,7 +74,7 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); - DataTree store = InMemoryDataTreeFactory.getInstance().create(); + DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -120,11 +84,10 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeMessages.Container encode = codec.encode(expected); - ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( - encode.getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 1, 2, 3, 4)); + Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 1, 2, 3, 4); - shard.underlyingActor().onReceiveCommand(applySnapshot); + shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); NormalizedNode actual = readStore(shard, root); @@ -135,26 +98,30 @@ public class PreLithiumShardTest extends AbstractShardTest { @Test public void testHelium2VersionApplyStateLegacy() throws Exception { + new ShardTestKit(getSystem()) {{ + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), + "testHelium2VersionApplyStateLegacy"); - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy"); + waitUntilLeader(shard); - NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); - shard.underlyingActor().onReceiveCommand(applyState); + shard.underlyingActor().onReceiveCommand(applyState); - NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); - assertEquals("Applied state", node, actual); + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; } @Test public void testHelium2VersionRecovery() throws Exception { - DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -166,9 +133,12 @@ public class PreLithiumShardTest extends AbstractShardTest { getNormalizedNode().toByteString().toByteArray(), Collections.emptyList(), 0, 1, -1, -1)); + InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " + + "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1")); + // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -183,7 +153,7 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyPayload(mod))); } @@ -194,199 +164,12 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyByteStringPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries)); testRecovery(listEntryKeys); } - - @SuppressWarnings({ "unchecked" }) - @Test - public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable { - new ShardTestKit(getSystem()) {{ - final TestActorRef shard = TestActorRef.create(getSystem(), - newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testPreLithiumConcurrentThreePhaseCommits"); - - waitUntilLeader(shard); - - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - ShardDataTree dataStore = shard.underlyingActor().getDataStore(); - - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); - - long timeoutSec = 5; - final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); - final Timeout timeout = new Timeout(duration); - - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. - - shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, - cohort1, modification1, true, false), getRef()); - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); - - // Send the CanCommitTransaction message for the first Tx. - - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - // Send the ForwardedReadyTransaction for the next 2 Tx's. - - shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, - cohort2, modification2, true, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, - cohort3, modification3, true, false), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and - // processed after the first Tx completes. - - Future canCommitFuture1 = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); - - Future canCommitFuture2 = Patterns.ask(shard, - new CanCommitTransaction(transactionID3).toSerializable(), timeout); - - // Send the CommitTransaction message for the first Tx. After it completes, it should - // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. - - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - - // Wait for the next 2 Tx's to complete. - - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch commitLatch = new CountDownLatch(2); - - class OnFutureComplete extends OnComplete { - private final Class expRespType; - - OnFutureComplete(final Class expRespType) { - this.expRespType = expRespType; - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - if(error != null) { - caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error)); - } else { - try { - assertEquals("Commit response type", expRespType, resp.getClass()); - onSuccess(resp); - } catch (Exception e) { - caughtEx.set(e); - } - } - } - - void onSuccess(final Object resp) throws Exception { - } - } - - class OnCommitFutureComplete extends OnFutureComplete { - OnCommitFutureComplete() { - super(CommitTransactionReply.SERIALIZABLE_CLASS); - } - - @Override - public void onComplete(final Throwable error, final Object resp) { - super.onComplete(error, resp); - commitLatch.countDown(); - } - } - - class OnCanCommitFutureComplete extends OnFutureComplete { - private final String transactionID; - - OnCanCommitFutureComplete(final String transactionID) { - super(CanCommitTransactionReply.SERIALIZABLE_CLASS); - this.transactionID = transactionID; - } - - @Override - void onSuccess(final Object resp) throws Exception { - CanCommitTransactionReply canCommitReply = - CanCommitTransactionReply.fromSerializable(resp); - assertEquals("Can commit", true, canCommitReply.getCanCommit()); - - Future commitFuture = Patterns.ask(shard, - new CommitTransaction(transactionID).toSerializable(), timeout); - commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); - } - } - - canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), - getSystem().dispatcher()); - - canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), - getSystem().dispatcher()); - - boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); - - if(caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Commits complete", true, done); - - InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); - - // Verify data in the data store. - - NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); - assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", - outerList.getValue() instanceof Iterable); - Object entry = ((Iterable)outerList.getValue()).iterator().next(); - assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", - entry instanceof MapEntryNode); - MapEntryNode mapEntry = (MapEntryNode)entry; - Optional> idLeaf = - mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); - assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); - assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - - verifyLastApplied(shard, 2); - - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); - }}; - } }