Use YangInstanceIdentifier.EMPTY
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumShardTest.java
index ded5226d30c5f9b3a17531fcdd2f9b30ba9db274..0631b1b1bf871944c1b3bee6947759879c52a8bd 100644 (file)
@@ -8,66 +8,40 @@
 package org.opendaylight.controller.cluster.datastore.compat;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
-import akka.dispatch.Dispatchers;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
-import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
@@ -76,8 +50,8 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class PreLithiumShardTest extends AbstractShardTest {
 
-    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
+    private static CompositeModificationPayload newLegacyPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification(DataStoreVersions.HELIUM_2_VERSION);
         for(Modification mod: mods) {
             compMod.addModification(mod);
         }
@@ -85,8 +59,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
-    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
+    private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification(DataStoreVersions.HELIUM_2_VERSION);
         for(Modification mod: mods) {
             compMod.addModification(mod);
         }
@@ -94,15 +68,6 @@ public class PreLithiumShardTest extends AbstractShardTest {
         return new CompositeModificationByteStringPayload(compMod.toSerializable());
     }
 
-    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
-
-        return new ModificationPayload(compMod);
-    }
-
     @Test
     public void testApplyHelium2VersionSnapshot() throws Exception {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
@@ -110,21 +75,20 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
         NormalizedNode<?,?> expected = readStore(store, root);
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
-        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                encode.getNormalizedNode().toByteString().toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+        Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
 
-        shard.underlyingActor().onReceiveCommand(applySnapshot);
+        shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
 
         NormalizedNode<?,?> actual = readStore(shard, root);
 
@@ -135,29 +99,32 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
     @Test
     public void testHelium2VersionApplyStateLegacy() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                    "testHelium2VersionApplyStateLegacy");
 
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy");
+            waitUntilLeader(shard);
 
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
-                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
+            ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
+                new ReplicatedLogImplEntry(1, 2,
+                    newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
-        shard.underlyingActor().onReceiveCommand(applyState);
+            shard.underlyingActor().onReceiveCommand(applyState);
 
-        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
-        assertEquals("Applied state", node, actual);
+            NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+            assertEquals("Applied state", node, actual);
 
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
     }
 
     @Test
     public void testHelium2VersionRecovery() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
-
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -168,9 +135,12 @@ public class PreLithiumShardTest extends AbstractShardTest {
                                 getNormalizedNode().toByteString().toByteArray(),
                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
 
+        InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
+                "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
+
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
@@ -185,7 +155,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
                     newLegacyPayload(mod)));
         }
 
@@ -196,199 +166,12 @@ public class PreLithiumShardTest extends AbstractShardTest {
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+            InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
                     newLegacyByteStringPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
 
         testRecovery(listEntryKeys);
     }
-
-    @SuppressWarnings({ "unchecked" })
-    @Test
-    public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
-        new ShardTestKit(getSystem()) {{
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testConcurrentThreePhaseCommits");
-
-            waitUntilLeader(shard);
-
-            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
-
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
-
-            String transactionID1 = "tx1";
-            MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
-                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
-
-            String transactionID2 = "tx2";
-            MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
-                    TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                    modification2);
-
-            String transactionID3 = "tx3";
-            MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
-                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
-                    modification3);
-
-            long timeoutSec = 5;
-            final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
-            final Timeout timeout = new Timeout(duration);
-
-            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
-            // by the ShardTransaction.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
-                    cohort1, modification1, true), getRef());
-            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
-                    expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
-            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
-
-            // Send the CanCommitTransaction message for the first Tx.
-
-            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
-            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
-                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
-            assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
-            // Send the ForwardedReadyTransaction for the next 2 Tx's.
-
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
-                    cohort2, modification2, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
-                    cohort3, modification3, true), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
-
-            // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
-            // processed after the first Tx completes.
-
-            Future<Object> canCommitFuture1 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
-
-            Future<Object> canCommitFuture2 = Patterns.ask(shard,
-                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
-
-            // Send the CommitTransaction message for the first Tx. After it completes, it should
-            // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
-
-            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
-            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
-
-            // Wait for the next 2 Tx's to complete.
-
-            final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
-            final CountDownLatch commitLatch = new CountDownLatch(2);
-
-            class OnFutureComplete extends OnComplete<Object> {
-                private final Class<?> expRespType;
-
-                OnFutureComplete(final Class<?> expRespType) {
-                    this.expRespType = expRespType;
-                }
-
-                @Override
-                public void onComplete(final Throwable error, final Object resp) {
-                    if(error != null) {
-                        caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
-                    } else {
-                        try {
-                            assertEquals("Commit response type", expRespType, resp.getClass());
-                            onSuccess(resp);
-                        } catch (Exception e) {
-                            caughtEx.set(e);
-                        }
-                    }
-                }
-
-                void onSuccess(final Object resp) throws Exception {
-                }
-            }
-
-            class OnCommitFutureComplete extends OnFutureComplete {
-                OnCommitFutureComplete() {
-                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
-                }
-
-                @Override
-                public void onComplete(final Throwable error, final Object resp) {
-                    super.onComplete(error, resp);
-                    commitLatch.countDown();
-                }
-            }
-
-            class OnCanCommitFutureComplete extends OnFutureComplete {
-                private final String transactionID;
-
-                OnCanCommitFutureComplete(final String transactionID) {
-                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
-                    this.transactionID = transactionID;
-                }
-
-                @Override
-                void onSuccess(final Object resp) throws Exception {
-                    CanCommitTransactionReply canCommitReply =
-                            CanCommitTransactionReply.fromSerializable(resp);
-                    assertEquals("Can commit", true, canCommitReply.getCanCommit());
-
-                    Future<Object> commitFuture = Patterns.ask(shard,
-                            new CommitTransaction(transactionID).toSerializable(), timeout);
-                    commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
-                }
-            }
-
-            canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
-                    getSystem().dispatcher());
-
-            canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
-                    getSystem().dispatcher());
-
-            boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
-
-            if(caughtEx.get() != null) {
-                throw caughtEx.get();
-            }
-
-            assertEquals("Commits complete", true, done);
-
-            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
-            inOrder.verify(cohort1).canCommit();
-            inOrder.verify(cohort1).preCommit();
-            inOrder.verify(cohort1).commit();
-            inOrder.verify(cohort2).canCommit();
-            inOrder.verify(cohort2).preCommit();
-            inOrder.verify(cohort2).commit();
-            inOrder.verify(cohort3).canCommit();
-            inOrder.verify(cohort3).preCommit();
-            inOrder.verify(cohort3).commit();
-
-            // Verify data in the data store.
-
-            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                    outerList.getValue() instanceof Iterable);
-            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                       entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
-
-            verifyLastLogIndex(shard, 2);
-
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-        }};
-    }
 }