X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fcompat%2FPreLithiumShardTest.java;h=5fb02619e9b7e859e4fabc0d678d848aa79d82ae;hb=7ec296abdea94bbdc336276731ee545a2fb13e7c;hp=a2309be48f4e09f5b102418611bf7d035943a589;hpb=9fb1df14f2dc885fee1dce821b753cc99af6e54f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index a2309be48f..5fb02619e9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -20,7 +20,6 @@ import akka.pattern.Patterns; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -38,11 +37,9 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; @@ -50,7 +47,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; @@ -64,6 +60,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import scala.concurrent.Future; @@ -76,7 +73,7 @@ import scala.concurrent.duration.FiniteDuration; */ public class PreLithiumShardTest extends AbstractShardTest { - private CompositeModificationPayload newLegacyPayload(final Modification... mods) { + private static CompositeModificationPayload newLegacyPayload(final Modification... mods) { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -85,7 +82,7 @@ public class PreLithiumShardTest extends AbstractShardTest { return new CompositeModificationPayload(compMod.toSerializable()); } - private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { + private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -94,15 +91,6 @@ public class PreLithiumShardTest extends AbstractShardTest { return new CompositeModificationByteStringPayload(compMod.toSerializable()); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { - MutableCompositeModification compMod = new MutableCompositeModification(); - for(Modification mod: mods) { - compMod.addModification(mod); - } - - return new ModificationPayload(compMod); - } - @Test public void testApplyHelium2VersionSnapshot() throws Exception { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), @@ -110,7 +98,7 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); - DataTree store = InMemoryDataTreeFactory.getInstance().create(); + DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -120,11 +108,10 @@ public class PreLithiumShardTest extends AbstractShardTest { NormalizedNodeMessages.Container encode = codec.encode(expected); - ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create( - encode.getNormalizedNode().toByteString().toByteArray(), - Collections.emptyList(), 1, 2, 3, 4)); + Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(), + Collections.emptyList(), 1, 2, 3, 4); - shard.underlyingActor().onReceiveCommand(applySnapshot); + shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); NormalizedNode actual = readStore(shard, root); @@ -135,26 +122,30 @@ public class PreLithiumShardTest extends AbstractShardTest { @Test public void testHelium2VersionApplyStateLegacy() throws Exception { + new ShardTestKit(getSystem()) {{ + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), + "testHelium2VersionApplyStateLegacy"); - TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy"); + waitUntilLeader(shard); - NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, - newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); - shard.underlyingActor().onReceiveCommand(applyState); + shard.underlyingActor().onReceiveCommand(applyState); - NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); - assertEquals("Applied state", node, actual); + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; } @Test public void testHelium2VersionRecovery() throws Exception { - DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); + DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -166,9 +157,12 @@ public class PreLithiumShardTest extends AbstractShardTest { getNormalizedNode().toByteString().toByteArray(), Collections.emptyList(), 0, 1, -1, -1)); + InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " + + "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1")); + // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); @@ -183,7 +177,7 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyPayload(mod))); } @@ -194,11 +188,11 @@ public class PreLithiumShardTest extends AbstractShardTest { .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); - InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyByteStringPayload(mod))); } - InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries)); + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries)); testRecovery(listEntryKeys); } @@ -244,8 +238,7 @@ public class PreLithiumShardTest extends AbstractShardTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, - cohort1, modification1, true, false), getRef()); + shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -259,12 +252,10 @@ public class PreLithiumShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, - cohort2, modification2, true, false), getRef()); + shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, - cohort3, modification3, true, false), getRef()); + shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and