Merge "BUG 2509 : Removing all journal entries from a Followers in-memory journal...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index b7c371dd39722986b7601bec8cce1e0127ec71d4..a04d6aeb556cd2f84ffb10ac23302c9e5928451b 100644 (file)
@@ -1,27 +1,35 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
@@ -33,20 +41,24 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         return new Follower(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), followerActor);
+    @Override protected  RaftActorContext createActorContext() {
+        return createActorContext(followerActor);
+    }
+
+    protected  RaftActorContext createActorContext(ActorRef actorRef){
+        return new MockRaftActorContext("test", getSystem(), actorRef);
     }
 
     @Test
     public void testThatAnElectionTimeoutIsTriggered(){
         new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
+            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
                 protected void run() {
 
                     Follower follower = new Follower(createActorContext(getTestActor()));
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "ElectionTimeout") {
+                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
@@ -69,10 +81,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         Follower follower =
             new Follower(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             follower.handleMessage(followerActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Candidate, raftState);
+        assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
@@ -157,19 +169,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 createActorContext();
 
             context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
+            setLastLogEntry((MockRaftActorContext) context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
 
             List<ReplicatedLogEntry> entries =
                 Arrays.asList(
-                    (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(100, 101,
-                        "foo")
+                        (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
+                                new MockRaftActorContext.MockPayload("foo"))
                 );
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
             assertEquals(101L, context.getLastApplied());
@@ -198,7 +212,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // Set the last log entry term for the receiver to be greater than
             // what we will be sending as the prevLogTerm in AppendEntries
             MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
-                setLastLogEntry(context, 20, 0, "");
+                setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
 
             // AppendEntries is now sent with a bigger term
             // this will set the receivers term to be the same as the sender's term
@@ -208,12 +222,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // Also expect an AppendEntriesReply to be sent where success is false
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -259,20 +273,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             MockRaftActorContext.SimpleReplicatedLog log =
                 new MockRaftActorContext.SimpleReplicatedLog();
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
 
             context.setReplicatedLog(log);
 
             // Prepare the entries to be sent with AppendEntries
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
             entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
 
             // Send appendEntries with the same term as was set on the receiver
             // before the new behavior was created (1 in this case)
@@ -284,12 +298,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
             assertEquals(5, log.last().getIndex() + 1);
             assertNotNull(log.get(3));
             assertNotNull(log.get(4));
@@ -339,20 +353,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             MockRaftActorContext.SimpleReplicatedLog log =
                 new MockRaftActorContext.SimpleReplicatedLog();
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
             log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
 
             context.setReplicatedLog(log);
 
             // Prepare the entries to be sent with AppendEntries
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
             entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
 
             // Send appendEntries with the same term as was set on the receiver
             // before the new behavior was created (1 in this case)
@@ -364,12 +378,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // The entry at index 2 will be found out-of-sync with the leader
             // and will be removed
@@ -378,13 +392,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals(4, log.last().getIndex() + 1);
             assertNotNull(log.get(2));
 
-
-            assertEquals("one", log.get(1).getData());
+            assertEquals("one", log.get(1).getData().toString());
 
             // Check that the entry at index 2 has the new data
-            assertEquals("two-1", log.get(2).getData());
+            assertEquals("two-1", log.get(2).getData().toString());
+
+            assertEquals("three", log.get(3).getData().toString());
 
-            assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));
 
             // Also expect an AppendEntriesReply to be sent where success is false
@@ -407,4 +421,304 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleAppendEntriesPreviousLogEntryMissing(){
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                    createActorContext();
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                    new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+            AppendEntries appendEntries =
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftActorBehavior raftBehavior =
+                    behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftBehavior);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                    "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(false, out);
+
+        }};
+
+    }
+
+    @Test
+    public void testHandleAppendAfterInstallingSnapshot(){
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                    createActorContext();
+
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                    new MockRaftActorContext.SimpleReplicatedLog();
+
+            // Set up a log as if it has been snapshotted
+            log.setSnapshotIndex(3);
+            log.setSnapshotTerm(1);
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+            AppendEntries appendEntries =
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftActorBehavior raftBehavior =
+                    behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftBehavior);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                    "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
+        }};
+
+    }
+
+
+    /**
+     * This test verifies that when InstallSnapshot is received by
+     * the follower its applied correctly.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(
+                MessageCollectorActor.class));
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext(getRef());
+
+            Follower follower = (Follower)createBehavior(context);
+
+            HashMap<String, String> followerSnapshot = new HashMap<>();
+            followerSnapshot.put("1", "A");
+            followerSnapshot.put("2", "B");
+            followerSnapshot.put("3", "C");
+
+            ByteString bsSnapshot  = toByteString(followerSnapshot);
+            ByteString chunkData = ByteString.EMPTY;
+            int offset = 0;
+            int snapshotLength = bsSnapshot.size();
+            int i = 1;
+            int chunkIndex = 1;
+
+            do {
+                chunkData = getNextChunk(bsSnapshot, offset);
+                final InstallSnapshot installSnapshot =
+                    new InstallSnapshot(1, "leader-1", i, 1,
+                        chunkData, chunkIndex, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+                offset = offset + 50;
+                i++;
+                chunkIndex++;
+            } while ((offset+50) < snapshotLength);
+
+            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
+            follower.handleMessage(leaderActor, installSnapshot3);
+
+            String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
+                @Override
+                protected String match(Object o) throws Exception {
+                    if (o instanceof ApplySnapshot) {
+                        ApplySnapshot as = (ApplySnapshot)o;
+                        if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
+                            return "applySnapshot-lastIndex-mismatch";
+                        }
+                        if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
+                            return "applySnapshot-lastAppliedTerm-mismatch";
+                        }
+                        if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
+                            return "applySnapshot-lastAppliedIndex-mismatch";
+                        }
+                        if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
+                            return "applySnapshot-lastTerm-mismatch";
+                        }
+                        return "applySnapshot";
+                    }
+
+                    return "ignoreCase";
+                }
+            }.get();
+
+            // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
+            assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+            String applySnapshotMatch = "";
+            for (String reply: matches) {
+                if (reply.startsWith("applySnapshot")) {
+                    applySnapshotMatch = reply;
+                }
+            }
+
+            assertEquals("applySnapshot", applySnapshotMatch);
+
+            Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+            assertNotNull(messages);
+            assertTrue(messages instanceof List);
+            List<Object> listMessages = (List<Object>) messages;
+
+            int installSnapshotReplyReceivedCount = 0;
+            for (Object message: listMessages) {
+                if (message instanceof InstallSnapshotReply) {
+                    ++installSnapshotReplyReceivedCount;
+                }
+            }
+
+            assertEquals(3, installSnapshotReplyReceivedCount);
+
+        }};
+    }
+
+    @Test
+    public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor = getSystem().actorOf(Props.create(
+                        MessageCollectorActor.class));
+
+                MockRaftActorContext context = (MockRaftActorContext)
+                        createActorContext(getRef());
+
+                Follower follower = (Follower) createBehavior(context);
+
+                HashMap<String, String> followerSnapshot = new HashMap<>();
+                followerSnapshot.put("1", "A");
+                followerSnapshot.put("2", "B");
+                followerSnapshot.put("3", "C");
+
+                ByteString bsSnapshot = toByteString(followerSnapshot);
+
+                final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
+                follower.handleMessage(leaderActor, installSnapshot);
+
+                Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+
+                assertNotNull(messages);
+                assertTrue(messages instanceof List);
+                List<Object> listMessages = (List<Object>) messages;
+
+                int installSnapshotReplyReceivedCount = 0;
+                for (Object message: listMessages) {
+                    if (message instanceof InstallSnapshotReply) {
+                        ++installSnapshotReplyReceivedCount;
+                    }
+                }
+
+                assertEquals(1, installSnapshotReplyReceivedCount);
+                InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
+                assertEquals(false, reply.isSuccess());
+                assertEquals(-1, reply.getChunkIndex());
+                assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+
+
+            }};
+    }
+
+    public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
+        return MessageCollectorActor.getAllMessages(actor);
+    }
+
+    public ByteString getNextChunk (ByteString bs, int offset){
+        int snapshotLength = bs.size();
+        int start = offset;
+        int size = 50;
+        if (50 > snapshotLength) {
+            size = snapshotLength;
+        } else {
+            if ((start + 50) > snapshotLength) {
+                size = snapshotLength - start;
+            }
+        }
+        return bs.substring(start, start + size);
+    }
+
+    private ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream b = null;
+        ObjectOutputStream o = null;
+        try {
+            try {
+                b = new ByteArrayOutputStream();
+                o = new ObjectOutputStream(b);
+                o.writeObject(state);
+                byte[] snapshotBytes = b.toByteArray();
+                return ByteString.copyFrom(snapshotBytes);
+            } finally {
+                if (o != null) {
+                    o.flush();
+                    o.close();
+                }
+                if (b != null) {
+                    b.close();
+                }
+            }
+        } catch (IOException e) {
+            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
+        }
+        return null;
+    }
 }