Fix intermittent failure in ClusterAdminRpcServiceTest.testModuleShardLeaderMovement
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MigratedMessagesTest.java
index 08967d2b3f61f86c4dd71c83295a42fc121c9bcc..460dd4a445a8306731499e205b1207c563e1e110 100644 (file)
@@ -14,18 +14,31 @@ import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -100,7 +113,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
     }
@@ -119,7 +132,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
     }
@@ -131,7 +144,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
         String persistenceId = factory.generateActorId("test-actor-");
 
         InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
-        InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
+        InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
                 new MockRaftActorContext.MockPayload("A")));
         InMemoryJournal.addEntry(persistenceId, 3,
                 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
@@ -142,25 +155,36 @@ public class MigratedMessagesTest extends AbstractActorTest {
             assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
             assertEquals("getLastIndex", 0, snapshot.getLastIndex());
             assertEquals("getLastTerm", 1, snapshot.getLastTerm());
-        });
+        }, ByteState.empty());
 
         TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
     }
 
     @Test
     public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
+        TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
         String id = factory.generateActorId("test-actor-");
+
+        InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
+        InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
+        InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
+
         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(Snapshot.State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
@@ -175,6 +199,29 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
         assertEquals("Snapshots", 0, snapshots.size());
+
+        TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
+    }
+
+    @Test
+    public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
+        MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
+        InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
+                0, 1, expPayload));
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
+            assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
+            assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
+            assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
+        }, ByteState.empty());
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
     }
 
     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
@@ -191,7 +238,7 @@ public class MigratedMessagesTest extends AbstractActorTest {
                 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
 
         InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
-        InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
+        InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
 
         TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
             persistent, snapshot -> {
@@ -199,14 +246,53 @@ public class MigratedMessagesTest extends AbstractActorTest {
                 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
                 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
                         new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
-            });
+            }, ByteState.empty());
 
         return actor;
     }
 
+    @Test
+    public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
+        final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
+
+        org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
+            .create(SerializationUtils.serialize((Serializable) snapshotData),
+                Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
+                6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
+                        new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
+        InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
+
+        doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
+            assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
+            assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
+            assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
+            assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
+            assertEquals("getState", snapshotState, snapshot.getState());
+            assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
+                    snapshot.getUnAppliedEntries().size());
+            assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
+                    snapshot.getUnAppliedEntries().get(0).getTerm());
+            assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
+                    snapshot.getUnAppliedEntries().get(0).getIndex());
+            assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
+                    snapshot.getUnAppliedEntries().get(0).getData());
+            assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
+            assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
+            assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
+                    Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
+        }, snapshotState);
+
+        TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
-            Consumer<Snapshot> snapshotVerifier) {
+            Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -214,17 +300,23 @@ public class MigratedMessagesTest extends AbstractActorTest {
 
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
-                actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+            public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+                actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
+            }
+
+            @Override
+            public void applySnapshot(State newState) {
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public State deserializeSnapshot(ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
 
         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
-                .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props()
+                .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
+                .peerAddresses(ImmutableMap.of("peer", "")).props()
                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();