BUG 2185 : Introduce the SwitchBehavior message
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index e5c8677b0466fa0b282a413ef2291dd0318ce01e..c73344c44aa0bd2b28e7a8b69e05ef7a84996b99 100644 (file)
@@ -1,9 +1,17 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -17,6 +25,7 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -43,7 +52,6 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -52,6 +60,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -251,6 +261,10 @@ public class RaftActorTest extends AbstractActorTest {
         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
         mockRaftActor.handleRecover(updateElectionTerm);
 
+        org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
+                new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
+        mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
+
         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
         verify(mockSupport).handleRecoveryMessage(same(logEntry));
         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
@@ -258,6 +272,7 @@ public class RaftActorTest extends AbstractActorTest {
         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
+        verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm));
     }
 
     @Test
@@ -413,15 +428,18 @@ public class RaftActorTest extends AbstractActorTest {
                     notifierActor, LeaderStateChanged.class);
 
             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+            assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
 
             notifierActor.underlyingActor().clear();
 
             MockRaftActor raftActor = raftActorRef.underlyingActor();
             final String newLeaderId = "new-leader";
+            final short newLeaderVersion = 6;
             Follower follower = new Follower(raftActor.getRaftActorContext()) {
                 @Override
                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
                     leaderId = newLeaderId;
+                    setLeaderPayloadVersion(newLeaderVersion);
                     return this;
                 }
             };
@@ -443,6 +461,15 @@ public class RaftActorTest extends AbstractActorTest {
             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
             assertEquals(persistenceId, leaderStateChange.getMemberId());
             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
+            assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
+
+            notifierActor.underlyingActor().clear();
+
+            raftActor.handleCommand("any");
+
+            Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
+            leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertNull(leaderStateChange);
         }};
     }
 
@@ -542,13 +569,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 //fake snapshot on index 5
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 //fake snapshot on index 6
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
@@ -565,10 +592,10 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
                         leader, Runtime.getRuntime().totalMemory());
 
-                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -579,7 +606,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
 
                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
                 assertEquals(2, leaderActor.getReplicatedLog().size());
                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
 
@@ -645,7 +672,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                         new MockRaftActorContext.MockPayload("foo-6"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
                 assertEquals(7, followerActor.getReplicatedLog().size());
 
                 //fake snapshot on index 7
@@ -656,7 +683,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
@@ -669,10 +696,10 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
-                followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+                followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@@ -684,7 +711,7 @@ public class RaftActorTest extends AbstractActorTest {
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
@@ -742,12 +769,12 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
@@ -760,7 +787,7 @@ public class RaftActorTest extends AbstractActorTest {
 
 
                 //reply from a slow follower does not initiate a fake snapshot
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
@@ -770,12 +797,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
@@ -792,7 +819,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
-            Map<String, String> peerAddresses = new HashMap<>();
+            Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
                     MockRaftActor.props(persistenceId, peerAddresses,
@@ -814,15 +841,16 @@ public class RaftActorTest extends AbstractActorTest {
             leaderActor.setCurrentBehavior(leader);
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-            // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            // Simulate an install snaphost to a follower.
+            leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
+                    leaderActor.getReplicatedLog().last(), -1, "member1");
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
 
             // Trimming log in this scenario is a no-op
             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+            assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(-1, leader.getReplicatedToAllIndex());
 
         }};
@@ -839,7 +867,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
-            Map<String, String> peerAddresses = new HashMap<>();
+            Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
                     MockRaftActor.props(persistenceId, peerAddresses,
@@ -865,12 +893,56 @@ public class RaftActorTest extends AbstractActorTest {
 
             // Trimming log in this scenario is a no-op
             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+            assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(3, leader.getReplicatedToAllIndex());
 
         }};
     }
 
+    @Test
+    public void testSwitchBehavior(){
+        String persistenceId = factory.generateActorId("leader-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setSnapshotBatchCount(5);
+
+        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+
+        Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
+
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+                MockRaftActor.props(persistenceId, peerAddresses,
+                        Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+        MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
+        leaderActor.handleRecover(RecoveryCompleted.getInstance());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
+
+        assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+
+    }
+
     public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;