Bug 2264: Use streaming for snapshots
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 14fc3a12bd9b97bf891586eb3902c502fa18e142..94b9698abf3a06f41527a83e5cec2461b2658266 100644 (file)
@@ -17,7 +17,9 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
+import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -43,6 +45,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -67,6 +70,7 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -364,13 +368,41 @@ public class ShardTest extends AbstractActorTest {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
-        NormalizedNodeToNodeCodec codec =
-            new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
-        writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
-        NormalizedNode<?,?> expected = readStore(shard, root);
+        NormalizedNode<?,?> expected = readStore(store, root);
+
+        ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
+                SerializationUtils.serializeNormalizedNode(expected),
+                Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+
+        shard.underlyingActor().onReceiveCommand(applySnapshot);
+
+        NormalizedNode<?,?> actual = readStore(shard, root);
+
+        assertEquals("Root node", expected, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testApplyHelium2VersionSnapshot() throws Exception {
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+                "testApplySnapshot");
+
+        NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
+
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
+        NormalizedNode<?,?> expected = readStore(store, root);
 
         NormalizedNodeMessages.Container encode = codec.encode(expected);
 
@@ -382,7 +414,7 @@ public class ShardTest extends AbstractActorTest {
 
         NormalizedNode<?,?> actual = readStore(shard, root);
 
-        assertEquals(expected, actual);
+        assertEquals("Root node", expected, actual);
 
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
@@ -423,7 +455,6 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    @SuppressWarnings("serial")
     @Test
     public void testRecovery() throws Exception {
 
@@ -432,20 +463,13 @@ public class ShardTest extends AbstractActorTest {
         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
 
-        DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
-        writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-        DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
-        NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
 
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
-                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
-                        root).
-                                getNormalizedNode().toByteString().toByteArray(),
-                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+                SerializationUtils.serializeNormalizedNode(root),
+                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
 
         // Set up the InMemoryJournal.
 
@@ -455,31 +479,63 @@ public class ShardTest extends AbstractActorTest {
 
         int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        int i = 1;
 
-        // Add some of the legacy CompositeModificationPayload
-        for(; i <= 2; i++) {
+        // Add some ModificationPayload entries
+        for(int i = 1; i <= nListEntries; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyPayload(mod)));
+                    newModificationPayload(mod)));
         }
 
-        // Add some of the legacy CompositeModificationByteStringPayload
-        for(; i <= 5; i++) {
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyLogEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @Test
+    public void testHelium2VersionRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+
+        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
+        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+
+        writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
+
+        InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
+                new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
+                                getNormalizedNode().toByteString().toByteArray(),
+                                Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+
+        // Set up the InMemoryJournal.
+
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+                  new WriteModification(TestModel.OUTER_LIST_PATH,
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+        int i = 1;
+
+        // Add some CompositeModificationPayload entries
+        for(; i <= 8; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newLegacyByteStringPayload(mod)));
+                    newLegacyPayload(mod)));
         }
 
-        // Add some of the ModificationPayload
+        // Add some CompositeModificationByteStringPayload entries
         for(; i <= nListEntries; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
@@ -487,16 +543,22 @@ public class ShardTest extends AbstractActorTest {
             Modification mod = new MergeModification(path,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newModificationPayload(mod)));
+                    newLegacyByteStringPayload(mod)));
         }
 
-        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
-                new ApplyLogEntries(nListEntries));
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
 
+    private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
         // Create the actor and wait for recovery complete.
 
+        int nListEntries = listEntryKeys.size();
+
         final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
+        @SuppressWarnings("serial")
         Creator<Shard> creator = new Creator<Shard>() {
             @Override
             public Shard create() throws Exception {
@@ -1319,19 +1381,54 @@ public class ShardTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCreateSnapshot() throws IOException, InterruptedException {
-            testCreateSnapshot(true, "testCreateSnapshot");
+    public void testCreateSnapshot() throws Exception {
+        testCreateSnapshot(true, "testCreateSnapshot");
     }
 
     @Test
-    public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
+    public void testCreateSnapshotWithNonPersistentData() throws Exception {
         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
     }
 
     @SuppressWarnings("serial")
-    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
-        final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
-                shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
+    public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
+            DataPersistenceProvider delegate;
+
+            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+                this.delegate = delegate;
+            }
+
+            @Override
+            public boolean isRecoveryApplicable() {
+                return delegate.isRecoveryApplicable();
+            }
+
+            @Override
+            public <T> void persist(T o, Procedure<T> procedure) {
+                delegate.persist(o, procedure);
+            }
+
+            @Override
+            public void saveSnapshot(Object o) {
+                savedSnapshot.set(o);
+                delegate.saveSnapshot(o);
+            }
+
+            @Override
+            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+                delegate.deleteSnapshots(criteria);
+            }
+
+            @Override
+            public void deleteMessages(long sequenceNumber) {
+                delegate.deleteMessages(sequenceNumber);
+            }
+        }
+
+        dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
@@ -1340,6 +1437,18 @@ public class ShardTest extends AbstractActorTest {
                 public Shard create() throws Exception {
                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
+
+                        DelegatingPersistentDataProvider delegating;
+
+                        @Override
+                        protected DataPersistenceProvider persistence() {
+                            if(delegating == null) {
+                                delegating = new DelegatingPersistentDataProvider(super.persistence());
+                            }
+
+                            return delegating;
+                        }
+
                         @Override
                         protected void commitSnapshot(final long sequenceNumber) {
                             super.commitSnapshot(sequenceNumber);
@@ -1354,16 +1463,40 @@ public class ShardTest extends AbstractActorTest {
 
             waitUntilLeader(shard);
 
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
+
+            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
             latch.set(new CountDownLatch(1));
-            shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
+            savedSnapshot.set(null);
+
+            shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
+            assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
+                    savedSnapshot.get() instanceof Snapshot);
+
+            verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
+
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }
+
+        private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
+
+            NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
+            assertEquals("Root node", expectedRoot, actual);
+
         }};
     }
 
@@ -1470,7 +1603,12 @@ public class ShardTest extends AbstractActorTest {
 
     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
+        return readStore(shard.underlyingActor().getDataStore(), id);
+    }
+
+    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
+            throws ExecutionException, InterruptedException {
+        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
             transaction.read(id);
@@ -1483,9 +1621,14 @@ public class ShardTest extends AbstractActorTest {
         return node;
     }
 
-    private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
-        throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
+    static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        writeToStore(shard.underlyingActor().getDataStore(), id, node);
+    }
+
+    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 
         transaction.write(id, node);