Refactor ShardTest 62/16162/4
authorTom Pantelis <tpanteli@brocade.com>
Sat, 7 Mar 2015 04:25:57 +0000 (23:25 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 8 Mar 2015 19:50:31 +0000 (15:50 -0400)
Refactored the helium compatibility tests in ShardTest into a new
PreLithiumShardTest class to keep them separate. This will make it
easier to remove the Helium test code when it's no longer needed.
Since it needs the same setup as ShardTest and some of the utility
methods, I moved all the setup and utility methods to an
AbstractShardTest to avoid duplication. This also makes ShardTest
smaller and easier to follow.

I also moved the ShardTransactionHeliumBackwardsCompatibilityTest to
the compat package.

Change-Id: Ie46441adbd0be4c4a655f5db640519705a6ced29
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java with 94% similarity]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
new file mode 100644 (file)
index 0000000..2c41b00
--- /dev/null
@@ -0,0 +1,292 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Abstract base for shard unit tests.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractShardTest extends AbstractActorTest{
+    protected static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+
+    protected final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
+
+    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
+            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
+            shardHeartbeatIntervalInMillis(100);
+
+    @Before
+    public void setUp() {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
+
+    @After
+    public void tearDown() {
+        InMemorySnapshotStore.clear();
+        InMemoryJournal.clear();
+    }
+
+    protected DatastoreContext newDatastoreContext() {
+        return dataStoreContextBuilder.build();
+    }
+
+    protected Props newShardProps() {
+        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                newDatastoreContext(), SCHEMA_CONTEXT);
+    }
+
+    protected void testRecovery(Set<Integer> listEntryKeys) throws Exception {
+        // Create the actor and wait for recovery complete.
+
+        int nListEntries = listEntryKeys.size();
+
+        final CountDownLatch recoveryComplete = new CountDownLatch(1);
+
+        @SuppressWarnings("serial")
+        Creator<Shard> creator = new Creator<Shard>() {
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                        newDatastoreContext(), SCHEMA_CONTEXT) {
+                    @Override
+                    protected void onRecoveryComplete() {
+                        try {
+                            super.onRecoveryComplete();
+                        } finally {
+                            recoveryComplete.countDown();
+                        }
+                    }
+                };
+            }
+        };
+
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
+
+        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
+
+        // Verify data in the data store.
+
+        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                outerList.getValue() instanceof Iterable);
+        for(Object entry: (Iterable<?>) outerList.getValue()) {
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                    entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            Object value = idLeaf.get().getValue();
+            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
+                    listEntryKeys.remove(value));
+        }
+
+        if(!listEntryKeys.isEmpty()) {
+            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
+                    listEntryKeys);
+        }
+
+        assertEquals("Last log index", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastLogIndex());
+        assertEquals("Commit index", nListEntries,
+                shard.underlyingActor().getShardMBean().getCommitIndex());
+        assertEquals("Last applied", nListEntries,
+                shard.underlyingActor().getShardMBean().getLastApplied());
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    protected void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
+        for(int i = 0; i < 20 * 5; i++) {
+            long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
+            if(lastLogIndex == expectedValue) {
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
+    }
+
+    protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+            transaction.read(YangInstanceIdentifier.builder().build());
+
+        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+
+        NormalizedNode<?, ?> normalizedNode = optional.get();
+
+        transaction.close();
+
+        return normalizedNode;
+    }
+
+    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final MutableCompositeModification modification) {
+        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
+    }
+
+    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final MutableCompositeModification modification,
+            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+
+        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
+        tx.write(path, data);
+        DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit);
+
+        modification.addModification(new WriteModification(path, data));
+
+        return cohort;
+    }
+
+    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
+            final DOMStoreThreePhaseCommitCohort actual) {
+        return createDelegatingMockCohort(cohortName, actual, null);
+    }
+
+    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
+            final DOMStoreThreePhaseCommitCohort actual,
+            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
+
+        doAnswer(new Answer<ListenableFuture<Boolean>>() {
+            @Override
+            public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
+                return actual.canCommit();
+            }
+        }).when(cohort).canCommit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
+                return actual.preCommit();
+            }
+        }).when(cohort).preCommit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
+                return actual.commit();
+            }
+        }).when(cohort).commit();
+
+        doAnswer(new Answer<ListenableFuture<Void>>() {
+            @Override
+            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
+                return actual.abort();
+            }
+        }).when(cohort).abort();
+
+        return cohort;
+    }
+
+    public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
+        return readStore(shard.underlyingActor().getDataStore(), id);
+    }
+
+    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+            transaction.read(id);
+
+        Optional<NormalizedNode<?, ?>> optional = future.get();
+        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
+
+        transaction.close();
+
+        return node;
+    }
+
+    public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        writeToStore(shard.underlyingActor().getDataStore(), id, node);
+    }
+
+    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+        transaction.write(id, node);
+
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
+
+    @SuppressWarnings("serial")
+    public static final class DelegatingShardCreator implements Creator<Shard> {
+        private final Creator<Shard> delegate;
+
+        DelegatingShardCreator(final Creator<Shard> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return delegate.create();
+        }
+    }
+}
index d930b25..f38b510 100644 (file)
@@ -5,8 +5,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -24,7 +22,6 @@ import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -38,16 +35,10 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -67,7 +58,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
@@ -79,7 +69,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -87,19 +76,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -108,46 +91,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class ShardTest extends AbstractActorTest {
-
-    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
-
-    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
-
-    private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
-
-    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
-            shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
-            shardHeartbeatIntervalInMillis(100);
-
-    @Before
-    public void setUp() {
-        Builder newBuilder = DatastoreContext.newBuilder();
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
-
-    @After
-    public void tearDown() {
-        InMemorySnapshotStore.clear();
-        InMemoryJournal.clear();
-    }
-
-    private DatastoreContext newDatastoreContext() {
-        return dataStoreContextBuilder.build();
-    }
-
-    private Props newShardProps() {
-        return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                newDatastoreContext(), SCHEMA_CONTEXT);
-    }
-
+public class ShardTest extends AbstractShardTest {
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -395,36 +343,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @Test
-    public void testApplyHelium2VersionSnapshot() throws Exception {
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
-                "testApplySnapshot");
-
-        NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
-
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
-
-        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(store, root);
-
-        NormalizedNodeMessages.Container encode = codec.encode(expected);
-
-        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
-                encode.getNormalizedNode().toByteString().toByteArray(),
-                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
-
-        shard.underlyingActor().onReceiveCommand(applySnapshot);
-
-        NormalizedNode<?,?> actual = readStore(shard, root);
-
-        assertEquals("Root node", expected, actual);
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
     @Test
     public void testApplyState() throws Exception {
 
@@ -443,24 +361,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @Test
-    public void testApplyStateLegacy() throws Exception {
-
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
-
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
-                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
-
-        shard.underlyingActor().onReceiveCommand(applyState);
-
-        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
-        assertEquals("Applied state", node, actual);
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
     @Test
     public void testRecovery() throws Exception {
 
@@ -479,7 +379,7 @@ public class ShardTest extends AbstractActorTest {
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
@@ -503,141 +403,6 @@ public class ShardTest extends AbstractActorTest {
         testRecovery(listEntryKeys);
     }
 
-    @Test
-    public void testHelium2VersionRecovery() throws Exception {
-
-        // Set up the InMemorySnapshotStore.
-
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
-
-        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
-        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
-
-        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
-                                getNormalizedNode().toByteString().toByteArray(),
-                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
-
-        // Set up the InMemoryJournal.
-
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
-                  new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
-
-        int nListEntries = 16;
-        Set<Integer> listEntryKeys = new HashSet<>();
-        int i = 1;
-
-        // Add some CompositeModificationPayload entries
-        for(; i <= 8; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
-            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
-            Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyPayload(mod)));
-        }
-
-        // Add some CompositeModificationByteStringPayload entries
-        for(; i <= nListEntries; i++) {
-            listEntryKeys.add(Integer.valueOf(i));
-            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
-            Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
-            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyByteStringPayload(mod)));
-        }
-
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
-
-        testRecovery(listEntryKeys);
-    }
-
-    private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
-        // Create the actor and wait for recovery complete.
-
-        int nListEntries = listEntryKeys.size();
-
-        final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-        @SuppressWarnings("serial")
-        Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                        newDatastoreContext(), SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-
-        TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)), "testRecovery");
-
-        assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
-
-        // Verify data in the data store.
-
-        NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
-        assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
-        assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
-                outerList.getValue() instanceof Iterable);
-        for(Object entry: (Iterable<?>) outerList.getValue()) {
-            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
-                    entry instanceof MapEntryNode);
-            MapEntryNode mapEntry = (MapEntryNode)entry;
-            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
-                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
-            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
-            Object value = idLeaf.get().getValue();
-            assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
-                    listEntryKeys.remove(value));
-        }
-
-        if(!listEntryKeys.isEmpty()) {
-            fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
-                    listEntryKeys);
-        }
-
-        assertEquals("Last log index", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastLogIndex());
-        assertEquals("Commit index", nListEntries,
-                shard.underlyingActor().getShardMBean().getCommitIndex());
-        assertEquals("Last applied", nListEntries,
-                shard.underlyingActor().getShardMBean().getLastApplied());
-
-        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
-    }
-
-    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
-
-        return new CompositeModificationPayload(compMod.toSerializable());
-    }
-
-    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
-
-        return new CompositeModificationByteStringPayload(compMod.toSerializable());
-    }
-
     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
@@ -647,59 +412,6 @@ public class ShardTest extends AbstractActorTest {
         return new ModificationPayload(compMod);
     }
 
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification) {
-        return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
-    }
-
-    private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-            final MutableCompositeModification modification,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
-
-        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
-        tx.write(path, data);
-        final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
-        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
-
-        doAnswer(new Answer<ListenableFuture<Boolean>>() {
-            @Override
-            public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
-                return realCohort.canCommit();
-            }
-        }).when(cohort).canCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                if(preCommit != null) {
-                    return preCommit.apply(realCohort);
-                } else {
-                    return realCohort.preCommit();
-                }
-            }
-        }).when(cohort).preCommit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return realCohort.commit();
-            }
-        }).when(cohort).commit();
-
-        doAnswer(new Answer<ListenableFuture<Void>>() {
-            @Override
-            public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
-                return realCohort.abort();
-            }
-        }).when(cohort).abort();
-
-        modification.addModification(new WriteModification(path, data));
-
-        return cohort;
-    }
-
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
@@ -887,18 +599,6 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
-    private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
-        for(int i = 0; i < 20 * 5; i++) {
-            long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
-            if(lastLogIndex == expectedValue) {
-                break;
-            }
-            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-        }
-
-        assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
-    }
-
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
@@ -1567,7 +1267,6 @@ public class ShardTest extends AbstractActorTest {
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
         }};
-
     }
 
     @Test
@@ -1640,21 +1339,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-
-    private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-            transaction.read(YangInstanceIdentifier.builder().build());
-
-        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
-
-        NormalizedNode<?, ?> normalizedNode = optional.get();
-
-        transaction.close();
-
-        return normalizedNode;
-    }
-
     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
         ListenableFuture<Void> future =
@@ -1666,64 +1350,4 @@ public class ShardTest extends AbstractActorTest {
         } catch (InterruptedException | ExecutionException e) {
         }
     }
-
-    private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-            @Override
-            public void onDataChanged(
-                final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-
-            }
-        };
-    }
-
-    static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        return readStore(shard.underlyingActor().getDataStore(), id);
-    }
-
-    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
-
-        Optional<NormalizedNode<?, ?>> optional = future.get();
-        NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
-
-        transaction.close();
-
-        return node;
-    }
-
-    static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        writeToStore(shard.underlyingActor().getDataStore(), id, node);
-    }
-
-    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
-        transaction.write(id, node);
-
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
-
-    @SuppressWarnings("serial")
-    private static final class DelegatingShardCreator implements Creator<Shard> {
-        private final Creator<Shard> delegate;
-
-        DelegatingShardCreator(final Creator<Shard> delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        public Shard create() throws Exception {
-            return delegate.create();
-        }
-    }
 }
index fa15db6..2d1b14c 100644 (file)
@@ -13,6 +13,8 @@ import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.Assert;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
@@ -21,12 +23,9 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-class ShardTestKit extends JavaTestKit {
+public class ShardTestKit extends JavaTestKit {
 
-    ShardTestKit(ActorSystem actorSystem) {
+    protected ShardTestKit(ActorSystem actorSystem) {
         super(actorSystem);
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
new file mode 100644 (file)
index 0000000..ded5226
--- /dev/null
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.dispatch.Dispatchers;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for backwards compatibility with pre-Lithium versions.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLithiumShardTest extends AbstractShardTest {
+
+    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationPayload(compMod.toSerializable());
+    }
+
+    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new ModificationPayload(compMod);
+    }
+
+    @Test
+    public void testApplyHelium2VersionSnapshot() throws Exception {
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                "testApplyHelium2VersionSnapshot");
+
+        NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = readStore(store, root);
+
+        NormalizedNodeMessages.Container encode = codec.encode(expected);
+
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                encode.getNormalizedNode().toByteString().toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+
+        shard.underlyingActor().onReceiveCommand(applySnapshot);
+
+        NormalizedNode<?,?> actual = readStore(shard, root);
+
+        assertEquals("Root node", expected, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testHelium2VersionApplyStateLegacy() throws Exception {
+
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testHelium2VersionRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
+
+        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
+                                getNormalizedNode().toByteString().toByteArray(),
+                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+        int i = 1;
+
+        // Add some CompositeModificationPayload entries
+        for(; i <= 8; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newLegacyPayload(mod)));
+        }
+
+        // Add some CompositeModificationByteStringPayload entries
+        for(; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newLegacyByteStringPayload(mod)));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    @Test
+    public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testConcurrentThreePhaseCommits");
+
+            waitUntilLeader(shard);
+
+            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
+
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
+
+            long timeoutSec = 5;
+            final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
+            final Timeout timeout = new Timeout(duration);
+
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message for the first Tx.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
+            // processed after the first Tx completes.
+
+            Future<Object> canCommitFuture1 = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID2).toSerializable(), timeout);
+
+            Future<Object> canCommitFuture2 = Patterns.ask(shard,
+                    new CanCommitTransaction(transactionID3).toSerializable(), timeout);
+
+            // Send the CommitTransaction message for the first Tx. After it completes, it should
+            // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Wait for the next 2 Tx's to complete.
+
+            final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+            final CountDownLatch commitLatch = new CountDownLatch(2);
+
+            class OnFutureComplete extends OnComplete<Object> {
+                private final Class<?> expRespType;
+
+                OnFutureComplete(final Class<?> expRespType) {
+                    this.expRespType = expRespType;
+                }
+
+                @Override
+                public void onComplete(final Throwable error, final Object resp) {
+                    if(error != null) {
+                        caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
+                    } else {
+                        try {
+                            assertEquals("Commit response type", expRespType, resp.getClass());
+                            onSuccess(resp);
+                        } catch (Exception e) {
+                            caughtEx.set(e);
+                        }
+                    }
+                }
+
+                void onSuccess(final Object resp) throws Exception {
+                }
+            }
+
+            class OnCommitFutureComplete extends OnFutureComplete {
+                OnCommitFutureComplete() {
+                    super(CommitTransactionReply.SERIALIZABLE_CLASS);
+                }
+
+                @Override
+                public void onComplete(final Throwable error, final Object resp) {
+                    super.onComplete(error, resp);
+                    commitLatch.countDown();
+                }
+            }
+
+            class OnCanCommitFutureComplete extends OnFutureComplete {
+                private final String transactionID;
+
+                OnCanCommitFutureComplete(final String transactionID) {
+                    super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
+                    this.transactionID = transactionID;
+                }
+
+                @Override
+                void onSuccess(final Object resp) throws Exception {
+                    CanCommitTransactionReply canCommitReply =
+                            CanCommitTransactionReply.fromSerializable(resp);
+                    assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+                    Future<Object> commitFuture = Patterns.ask(shard,
+                            new CommitTransaction(transactionID).toSerializable(), timeout);
+                    commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
+                }
+            }
+
+            canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
+                    getSystem().dispatcher());
+
+            canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
+                    getSystem().dispatcher());
+
+            boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
+
+            if(caughtEx.get() != null) {
+                throw caughtEx.get();
+            }
+
+            assertEquals("Commits complete", true, done);
+
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
+
+            // Verify data in the data store.
+
+            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
+            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
+                    outerList.getValue() instanceof Iterable);
+            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
+            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
+                       entry instanceof MapEntryNode);
+            MapEntryNode mapEntry = (MapEntryNode)entry;
+            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
+                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
+            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
+            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
+
+            verifyLastLogIndex(shard, 2);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore;
+package org.opendaylight.controller.cluster.datastore.compat;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -16,6 +16,13 @@ import akka.testkit.TestActorRef;
 import java.util.Collections;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.ShardTest;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -46,7 +53,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testTransactionCommit() throws Exception {
         new ShardTestKit(getSystem()) {{

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.