Introduce a mechanism for a Follower to signal it's sync up status 54/15954/2
authorMoiz Raja <moraja@cisco.com>
Tue, 3 Mar 2015 03:49:41 +0000 (19:49 -0800)
committerMoiz Raja <moraja@cisco.com>
Tue, 3 Mar 2015 23:22:23 +0000 (15:22 -0800)
This patch introduces a new message FollowerInitialSyncUpStatus. The
purpose of the message is to inform the RaftActor or a sub-class thereof
whether the Follower's commitIndex is now atleast at the same level
as the Leader was when it first sent a heartbeat to the Follower.

This will hopefully be useful in a rolling upgrade scenario where
we may have just brought up a new node and need to wait for it to be
synced with the current leader before we bring down and upgrade another
node in the cluster.

Change-Id: If19e4d98c2be708fc6a35823ca92cfac7ca87394
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java
new file mode 100644 (file)
index 0000000..3ce1f5d
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * The FollowerInitialSyncUpStatus is sent by a Follower to inform any RaftActor subclass whether the Follower
+ * is at least at the same commitIndex as the Leader was when it sent the follower the very first heartbeat.
+ *
+ * This status can be used to determine if a Follower has caught up with the current Leader in an upgrade scenario
+ * for example.
+ *
+ */
+public class FollowerInitialSyncUpStatus {
+    private final boolean initialSyncDone;
+
+    public FollowerInitialSyncUpStatus(boolean initialSyncDone){
+        this.initialSyncDone = initialSyncDone;
+    }
+
+    public boolean isInitialSyncDone() {
+        return initialSyncDone;
+    }
+}
index c799441d603597ea25d530db8dc6eddc9ced68b6..618865cb88eb8877cdcfdcfb29208c80707c2c0f 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -36,12 +37,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  */
 public class Follower extends AbstractRaftActorBehavior {
 
+
+
     private SnapshotTracker snapshotTracker = null;
 
+    private final InitialSyncStatusTracker initialSyncStatusTracker;
+
     public Follower(RaftActorContext context) {
         super(context, RaftState.Follower);
 
         scheduleElection(electionDuration());
+
+        initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
     }
 
     private boolean isLogEntryPresent(long index){
@@ -71,6 +78,10 @@ public class Follower extends AbstractRaftActorBehavior {
         return -1;
     }
 
+    private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
+        initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
+    }
+
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
                                                               AppendEntries appendEntries) {
 
@@ -97,6 +108,7 @@ public class Follower extends AbstractRaftActorBehavior {
         long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
         boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
 
+        updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
 
         boolean outOfSync = true;
 
@@ -280,7 +292,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
-
         LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
                     installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
@@ -289,6 +300,8 @@ public class Follower extends AbstractRaftActorBehavior {
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
         }
 
+        updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
+
         try {
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
@@ -338,4 +351,36 @@ public class Follower extends AbstractRaftActorBehavior {
     SnapshotTracker getSnapshotTracker(){
         return snapshotTracker;
     }
+
+    private static class InitialSyncStatusTracker {
+
+        private static final long INVALID_LOG_INDEX = -2L;
+        private long initialLeaderCommit = INVALID_LOG_INDEX;
+        private boolean initialSyncUpDone = false;
+        private String syncedLeaderId = null;
+        private final ActorRef actor;
+
+        public InitialSyncStatusTracker(ActorRef actor) {
+            this.actor = actor;
+        }
+
+        public void update(String leaderId, long leaderCommit, long commitIndex){
+
+            if(!leaderId.equals(syncedLeaderId)){
+                initialSyncUpDone = false;
+                initialLeaderCommit = INVALID_LOG_INDEX;
+                syncedLeaderId = leaderId;
+            }
+
+            if(!initialSyncUpDone){
+                if(initialLeaderCommit == INVALID_LOG_INDEX){
+                    actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+                    initialLeaderCommit = leaderCommit;
+                } else if(commitIndex >= initialLeaderCommit){
+                    actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+                    initialSyncUpDone = true;
+                }
+            }
+        }
+    }
 }
index 4e8e7fe11bad4fdeb085ffc1ac60450cbbfeda11..29fb613327f23b72755514ce6f2c256d447f8a83 100644 (file)
@@ -1,6 +1,8 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
@@ -19,6 +21,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -118,6 +121,178 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("isVoteGranted", false, reply.isVoteGranted());
     }
 
+
+    @Test
+    public void testHandleFirstAppendEntries() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+    }
+
+    @Test
+    public void testHandleSyncUpAppendEntries() throws Exception {
+        logStart("testHandleSyncUpAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        followerActor.underlyingActor().clear();
+
+        // Sending the same message again should not generate another message
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertNull(syncStatus);
+
+    }
+
+    @Test
+    public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
+    @Test
+    public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
     /**
      * This test verifies that when an AppendEntries RPC is received by a RaftActor
      * with a commitIndex that is greater than what has been applied to the
@@ -358,7 +533,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleAppendAfterInstallingSnapshot(){
+    public void testHandleAppendEntriesAfterInstallingSnapshot(){
         logStart("testHandleAppendAfterInstallingSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -449,7 +624,64 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
         }
 
-        Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+    }
+
+    @Test
+    public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
+        logStart("testInitialSyncUpWithHandleInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        HashMap<String, String> followerSnapshot = new HashMap<>();
+        followerSnapshot.put("1", "A");
+        followerSnapshot.put("2", "B");
+        followerSnapshot.put("3", "C");
+
+        ByteString bsSnapshot  = toByteString(followerSnapshot);
+        int offset = 0;
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+        int chunkIndex = 1;
+        InstallSnapshot lastInstallSnapshot = null;
+
+        for(int i = 0; i < totalChunks; i++) {
+            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                    chunkData, chunkIndex, totalChunks);
+            follower.handleMessage(leaderActor, lastInstallSnapshot);
+            offset = offset + 50;
+            lastIncludedIndex++;
+            chunkIndex++;
+        }
+
+        FollowerInitialSyncUpStatus syncStatus =
+                MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
     }
 
     @Test
@@ -479,7 +711,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getTerm", 1, reply.getTerm());
         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
 
-        Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
     }
 
     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){