Bug 2268: Serialize ApppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 168eb3e5f22c9752dcbe089fe2e87393713d2650..b31cb621b3576b1a9bcbaff321465d4bd186674e 100644 (file)
@@ -1,10 +1,24 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -18,6 +32,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -28,27 +43,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private ActorRef leaderActor =
+    private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
-    private ActorRef senderActor =
+    private final ActorRef senderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
 
     @Test
@@ -64,12 +66,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -89,6 +91,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -114,6 +117,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -124,7 +128,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                            followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
@@ -142,6 +146,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -166,6 +171,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -192,6 +198,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -265,8 +272,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
-                followerActor, AppendEntries.SERIALIZABLE_CLASS);
+            AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+                followerActor, AppendEntries.class);
 
             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
                 "received", aeproto);
@@ -479,6 +486,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             final String out =
                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                     // do not put code outside this method, will run afterwards
+                    @Override
                     protected String match(Object in) {
                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
                             InstallSnapshot is = (InstallSnapshot)
@@ -559,16 +567,166 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             assertTrue(raftBehavior instanceof Leader);
 
-            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-            assertEquals(leader.followerToLog.size(), 1);
-            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
 
+    @Test
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
+        }};
+    }
+
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
+                    }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
+
+                MockLeader leader = new MockLeader(actorContext);
+
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
+
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
     @Test
     public void testFollowerToSnapshotLogic() {
 
@@ -626,6 +784,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
@@ -717,14 +876,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntries.class);
 
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
             AppendEntriesReply appendEntriesReply =
@@ -786,14 +944,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntries.class);
 
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
             AppendEntriesReply appendEntriesReply =
@@ -941,10 +1098,76 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
 
                 assertEquals(RaftState.Leader, raftActorBehavior.state());
+            }};
+    }
 
+    @Test
+    public void testIsolatedLeaderCheckNoFollowers() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef leaderActor = getTestActor();
 
-            }};
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue(behavior instanceof Leader);
+        }};
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor1 = getTestActor();
+            ActorRef followerActor2 = getTestActor();
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
 
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerActor1.path().toString());
+            peerAddresses.put("follower-2", followerActor2.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.stopIsolatedLeaderCheckSchedule();
+
+            leader.markFollowerActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+                behavior instanceof Leader);
+
+            // kill 1 follower and verify if that got killed
+            final JavaTestKit probe = new JavaTestKit(getSystem());
+            probe.watch(followerActor1);
+            followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg1.getActor(), followerActor1);
+
+            leader.markFollowerInActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+                behavior instanceof Leader);
+
+            // kill 2nd follower and leader should change to Isolated leader
+            followerActor2.tell(PoisonPill.getInstance(), null);
+            probe.watch(followerActor2);
+            followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg2.getActor(), followerActor2);
+
+            leader.markFollowerInActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+                behavior instanceof IsolatedLeader);
+
+        }};
     }
 
     class MockLeader extends Leader {
@@ -961,15 +1184,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
-
+            setFollowerSnapshot(followerId, fts);
         }
     }
 
     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
-        private long electionTimeOutIntervalMillis;
-        private int snapshotChunkSize;
+        private final long electionTimeOutIntervalMillis;
+        private final int snapshotChunkSize;
 
         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
             super();