BUG 2185: Expand the scope of sync status to cover a slow follower 26/26426/2
authorMoiz Raja <moraja@cisco.com>
Sat, 22 Aug 2015 02:13:05 +0000 (19:13 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 14:41:34 +0000 (10:41 -0400)
Previously sync status was used only in the startup scenario
to make the controller appear to the external world as not
synced up unless it had received atleast data till the commitIndex
which the leader reported when it sent the follower it's first
heartbeat.

Now we also track when a new update is sent from the Leader to the
Follower and if the Follower is behind the Leader by a threshold
(hardcoded for now) then we consider the Follower as out-of-sync

Also I added the member name in the ShardManager bean so that is another
place from which we can figure out on which node we are running.

Change-Id: I1ba02575a0a1ac5d601af559f41971f2f5736f9d
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 47890ea59104fe349c5637d51a44d76f3571ce78)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java

index 974ec47..c853561 100644 (file)
@@ -17,7 +17,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -40,12 +39,14 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private SnapshotTracker snapshotTracker = null;
 
-    private final InitialSyncStatusTracker initialSyncStatusTracker;
+    private final SyncStatusTracker initialSyncStatusTracker;
+
+    private static final int SYNC_THRESHOLD = 10;
 
     public Follower(RaftActorContext context) {
         super(context, RaftState.Follower);
 
-        initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+        initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(context.getRaftPolicy().automaticElectionsEnabled()) {
             if (context.getPeerAddresses().isEmpty()) {
@@ -395,36 +396,4 @@ public class Follower extends AbstractRaftActorBehavior {
     SnapshotTracker getSnapshotTracker(){
         return snapshotTracker;
     }
-
-    private class InitialSyncStatusTracker {
-
-        private static final long INVALID_LOG_INDEX = -2L;
-        private long initialLeaderCommit = INVALID_LOG_INDEX;
-        private boolean initialSyncUpDone = false;
-        private String syncedLeaderId = null;
-        private final ActorRef actor;
-
-        public InitialSyncStatusTracker(ActorRef actor) {
-            this.actor = actor;
-        }
-
-        public void update(String leaderId, long leaderCommit, long commitIndex){
-
-            if(!leaderId.equals(syncedLeaderId)){
-                initialSyncUpDone = false;
-                initialLeaderCommit = INVALID_LOG_INDEX;
-                syncedLeaderId = leaderId;
-            }
-
-            if(!initialSyncUpDone){
-                if(initialLeaderCommit == INVALID_LOG_INDEX){
-                    actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
-                    initialLeaderCommit = leaderCommit;
-                } else if(commitIndex >= initialLeaderCommit){
-                    actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
-                    initialSyncUpDone = true;
-                }
-            }
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java
new file mode 100644 (file)
index 0000000..85622a5
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+
+/**
+ * The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not
+ * When an update is received from the Leader and the update happens to be the first update
+ * from that Leader then the SyncStatusTracker will not mark the Follower as not in-sync till the
+ * Followers commitIndex matches the commitIndex that the Leader sent in it's very first update.
+ * Subsequently when an update is received the tracker will consider the Follower to be out of
+ * sync if it is behind by 'syncThreshold' commits.
+ */
+public class SyncStatusTracker {
+
+    private static final boolean IN_SYNC = true;
+    private static final boolean NOT_IN_SYNC = false;
+    private static final boolean FORCE_STATUS_CHANGE = true;
+
+    private final String id;
+    private String syncedLeaderId = null;
+    private final ActorRef actor;
+    private final int syncThreshold;
+    private boolean syncStatus = false;
+    private long minimumExpectedIndex = -2L;
+
+    public SyncStatusTracker(ActorRef actor, String id, int syncThreshold) {
+        this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+        this.id = Preconditions.checkNotNull(id, "id should not be null");
+        Preconditions.checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0");
+        this.syncThreshold = syncThreshold;
+    }
+
+    public void update(String leaderId, long leaderCommit, long commitIndex){
+        leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null");
+
+        if(!leaderId.equals(syncedLeaderId)){
+            minimumExpectedIndex = leaderCommit;
+            changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE);
+            syncedLeaderId = leaderId;
+            return;
+        }
+
+        if((leaderCommit - commitIndex) > syncThreshold){
+            changeSyncStatus(NOT_IN_SYNC);
+        } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) {
+            changeSyncStatus(IN_SYNC);
+        }
+    }
+
+    private void changeSyncStatus(boolean newSyncStatus){
+        changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE);
+    }
+
+    private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){
+        if(syncStatus == newSyncStatus && !forceStatusChange){
+            return;
+        }
+        actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
+        syncStatus = newSyncStatus;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java
new file mode 100644 (file)
index 0000000..3ffaeb8
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+public class SyncStatusTrackerTest extends AbstractActorTest {
+    protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    private final TestActorRef<MessageCollectorActor> listener = actorFactory.createTestActor(
+            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("listener"));
+
+    @After
+    public void tearDown(){
+        actorFactory.close();
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        SyncStatusTracker tracker = new SyncStatusTracker(listener, "commit-tracker", 10);
+
+        // When leader-1 sends the first update message the listener should receive a syncStatus notification
+        // with status set to false
+        tracker.update("leader-1", 100, 99);
+        FollowerInitialSyncUpStatus status =
+                MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertEquals(false, status.isInitialSyncDone());
+        MessageCollectorActor.clearMessages(listener);
+
+        // At a minimum the follower should have the commit index that the new leader sent it in the first message
+        // Also the commit index must be below the syncThreshold. If both conditions are met a new sync status
+        // message with status = true should be expected
+        tracker.update("leader-1", 105, 101);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertEquals(true, status.isInitialSyncDone());
+        MessageCollectorActor.clearMessages(listener);
+
+        // If a subsequent message is received and if the difference between the followers commit index and
+        // the leaders commit index is below the syncThreshold then no status notification must be issues
+        tracker.update("leader-1", 108, 101);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertNull("No status message should be received", status);
+
+        // If the follower falls behind the leader by more than the syncThreshold then the listener should
+        // receive a syncStatus notification with status = false
+        tracker.update("leader-1", 150, 101);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertNotNull("No sync status message was received", status);
+
+        assertEquals(false, status.isInitialSyncDone());
+        MessageCollectorActor.clearMessages(listener);
+
+        // If the follower is not caught up yet it should not receive any further notification
+        tracker.update("leader-1", 150, 125);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertNull("No status message should be received", status);
+
+        // Once the syncThreshold is met a new syncStatus notification should be issued
+        tracker.update("leader-1", 160, 155);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertEquals(true, status.isInitialSyncDone());
+        MessageCollectorActor.clearMessages(listener);
+
+        // When a new leader starts sending update messages a new syncStatus notification should be immediately
+        // triggered with status = false
+        tracker.update("leader-2", 160, 155);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertEquals(false, status.isInitialSyncDone());
+        MessageCollectorActor.clearMessages(listener);
+
+        // If an update is received from a new leader which is still below the minimum expected index then
+        // syncStatus should not be changed
+        tracker.update("leader-2", 160, 159);
+
+        status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class);
+
+        assertNull("No status message should be received", status);
+
+    }
+
+    @Test
+    public void testConstructorActorShouldNotBeNull(){
+        try {
+            new SyncStatusTracker(null, "commit-tracker", 10);
+            fail("A NullPointerException was expected");
+        } catch(NullPointerException e){
+            assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("actor "));
+        }
+    }
+
+    @Test
+    public void testConstructorIdShouldNotBeNull(){
+        try {
+            new SyncStatusTracker(listener, null, 10);
+            fail("A NullPointerException was expected");
+        } catch(NullPointerException e){
+            assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("id "));
+        }
+    }
+
+    @Test
+    public void testConstructorSyncThresholdShouldNotBeNegative(){
+        try {
+            new SyncStatusTracker(listener, "commit-tracker", -1);
+            fail("An IllegalArgumentException was expected");
+        } catch(IllegalArgumentException e){
+            assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("syncThreshold "));
+        }
+    }
+
+}
\ No newline at end of file
index e05dd7c..5f59672 100644 (file)
@@ -622,7 +622,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
 
         mBean.setShardManager(this);
index aaac644..5de46cb 100644 (file)
@@ -29,20 +29,22 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
 
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class);
 
+    private final String memberName;
     private final List<String> localShards;
 
     private boolean syncStatus = false;
 
     private ShardManager shardManager;
 
-    public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+    public ShardManagerInfo(String memberName, String name, String mxBeanType, List<String> localShards) {
         super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+        this.memberName = memberName;
         this.localShards = localShards;
     }
 
-    public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+    public static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType,
             List<String> localShards){
-        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
+        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards);
 
         shardManagerInfo.registerMBean();
 
@@ -59,6 +61,11 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo
         return this.syncStatus;
     }
 
+    @Override
+    public String getMemberName() {
+        return memberName;
+    }
+
     @Override
     public void switchAllLocalShardsState(String newState, long term) {
         LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
index da0331e..92a43d2 100644 (file)
@@ -23,6 +23,13 @@ public interface ShardManagerInfoMBean {
      */
     boolean getSyncStatus();
 
+    /**
+     * Get the name of of the current member
+     *
+     * @return
+     */
+    String getMemberName();
+
     /**
      * Switch the Raft Behavior of all the local shards to the newBehavior
      *