Remove snapshot after startup and fix related bug
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index e19135cbcb0710f676470da9af0ded40565a48ca..6565c59b5f6ad4c9639d4d001f57c76fa2872f91 100644 (file)
@@ -65,12 +65,12 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
@@ -85,6 +85,7 @@ import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolic
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -307,9 +308,6 @@ public class RaftActorTest extends AbstractActorTest {
         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
         mockRaftActor.handleRecover(applyJournalEntries);
 
-        ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
-        mockRaftActor.handleRecover(applyLogEntries);
-
         DeleteEntries deleteEntries = new DeleteEntries(1);
         mockRaftActor.handleRecover(deleteEntries);
 
@@ -327,7 +325,6 @@ public class RaftActorTest extends AbstractActorTest {
         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
-        verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
@@ -364,11 +361,11 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
-        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotSuccess);
 
-        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotFailure);
 
@@ -442,9 +439,10 @@ public class RaftActorTest extends AbstractActorTest {
                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                         new MockRaftActorContext.MockPayload("F"));
 
-                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+                final Identifier id = new MockIdentifier("apply-state");
+                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
 
-                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
 
             }
         };
@@ -504,7 +502,7 @@ public class RaftActorTest extends AbstractActorTest {
             Follower follower = new Follower(raftActor.getRaftActorContext()) {
                 @Override
                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
-                    leaderId = newLeaderId;
+                    setLeaderId(newLeaderId);
                     setLeaderPayloadVersion(newLeaderVersion);
                     return this;
                 }
@@ -656,12 +654,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-4")));
 
                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
-                        leader, Runtime.getRuntime().totalMemory());
+                        Runtime.getRuntime().totalMemory());
 
                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -764,7 +762,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@@ -843,7 +841,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // simulate a real snapshot
-                leaderActor.onReceiveCommand(new SendHeartBeat());
+                leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
@@ -948,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
             // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
+                new MockRaftActorContext.MockPayload("duh"));
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
@@ -961,40 +960,6 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testRaftActorOnRecoverySnapshot() throws Exception {
-        TEST_LOG.info("testRaftActorOnRecoverySnapshot");
-
-        new JavaTestKit(getSystem()) {{
-                String persistenceId = factory.generateActorId("follower-");
-
-                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
-                // Set the heartbeat interval high to essentially disable election otherwise the test
-                // may fail if the actor is switched to Leader
-                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
-                ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
-
-                // Create mock ReplicatedLogEntry
-                ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
-                        new MockRaftActorContext.MockPayload("F", 1));
-
-                InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
-
-                TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses, config));
-
-                MockRaftActor mockRaftActor = ref.underlyingActor();
-
-                mockRaftActor.waitForRecoveryComplete();
-
-                mockRaftActor.waitForInitializeBehaviorComplete();
-
-                verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
-            }};
-    }
-
     @Test
     public void testSwitchBehavior(){
         String persistenceId = factory.generateActorId("leader-");
@@ -1132,9 +1097,6 @@ public class RaftActorTest extends AbstractActorTest {
 
         mockRaftActor.waitForRecoveryComplete();
 
-        // Wait for snapshot after recovery
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
-
         mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
 
         raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
@@ -1294,7 +1256,8 @@ public class RaftActorTest extends AbstractActorTest {
 
         mockRaftActor.waitForRecoveryComplete();
 
-        verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class));
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
 
         RaftActorContext context = mockRaftActor.getRaftActorContext();
         assertEquals("Journal log size", 1, context.getReplicatedLog().size());
@@ -1332,4 +1295,39 @@ public class RaftActorTest extends AbstractActorTest {
 
         TEST_LOG.info("testNonVotingOnRecovery ending");
     }
+
+    @Test
+    public void testLeaderTransitioning() throws Exception {
+        TEST_LOG.info("testLeaderTransitioning starting");
+
+        TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
+                0L, -1L, (short)1), ActorRef.noSender());
+        LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                notifierActor, LeaderStateChanged.class);
+        assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+
+        MessageCollectorActor.clearMessages(notifierActor);
+
+        raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
+
+        leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
+        assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+
+        TEST_LOG.info("testLeaderTransitioning ending");
+    }
 }