Bug 3570: Use SnapShot lastAppliedIndex for install snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 07548d6a9248b450e9ee478fabbe6dd7d357829e..89b83b3b369e8e5ed70349e2a1c8fcd223d7c9b4 100644 (file)
@@ -9,10 +9,10 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -439,8 +440,8 @@ public class LeaderTest extends AbstractLeaderTest {
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int newEntryIndex = 4;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
@@ -448,12 +449,15 @@ public class LeaderTest extends AbstractLeaderTest {
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
         //set follower timeout to 2 mins, helps during debugging
         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         // new entry
         ReplicatedLogImplEntry entry =
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -463,7 +467,8 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
@@ -489,7 +494,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
-        assertEquals(snapshotIndex, is.getLastIncludedIndex());
+        assertEquals(commitIndex, is.getLastIncludedIndex());
     }
 
     @Test
@@ -573,7 +578,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(Optional.<ByteString>absent());
+        leader.setSnapshot(null);
 
         // new entry
         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -616,8 +621,8 @@ public class LeaderTest extends AbstractLeaderTest {
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -625,15 +630,22 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
 
         leader = new Leader(actorContext);
 
-        // Ignore initial heartbeat.
+        // Initial heartbeat.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -642,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest {
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
-        assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
 
         assertEquals(currentTerm, installSnapshot.getTerm());
@@ -654,15 +666,18 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         // Ignore initial heartbeat.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -678,7 +693,8 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
@@ -698,9 +714,8 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex + 1, fli.getNextIndex());
+        assertEquals(commitIndex, fli.getMatchIndex());
+        assertEquals(commitIndex + 1, fli.getNextIndex());
     }
 
     @Test
@@ -709,8 +724,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -724,10 +739,13 @@ public class LeaderTest extends AbstractLeaderTest {
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
         actorContext.setConfigParams(configParams);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -739,9 +757,11 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -780,8 +800,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -792,10 +812,13 @@ public class LeaderTest extends AbstractLeaderTest {
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -807,10 +830,12 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -839,8 +864,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -851,10 +876,13 @@ public class LeaderTest extends AbstractLeaderTest {
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -866,9 +894,11 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);