From: Moiz Raja Date: Sat, 22 Aug 2015 02:13:05 +0000 (-0700) Subject: BUG 2185: Expand the scope of sync status to cover a slow follower X-Git-Tag: release/beryllium~321 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=bbc8a16efdc6bfa0d742b73af3374a33a12e2a1c BUG 2185: Expand the scope of sync status to cover a slow follower Previously sync status was used only in the startup scenario to make the controller appear to the external world as not synced up unless it had received atleast data till the commitIndex which the leader reported when it sent the follower it's first heartbeat. Now we also track when a new update is sent from the Leader to the Follower and if the Follower is behind the Leader by a threshold (hardcoded for now) then we consider the Follower as out-of-sync Also I added the member name in the ShardManager bean so that is another place from which we can figure out on which node we are running. Change-Id: I1ba02575a0a1ac5d601af559f41971f2f5736f9d Signed-off-by: Moiz Raja (cherry picked from commit 47890ea59104fe349c5637d51a44d76f3571ce78) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 974ec47585..c8535614a9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -17,7 +17,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; -import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -40,12 +39,14 @@ public class Follower extends AbstractRaftActorBehavior { private SnapshotTracker snapshotTracker = null; - private final InitialSyncStatusTracker initialSyncStatusTracker; + private final SyncStatusTracker initialSyncStatusTracker; + + private static final int SYNC_THRESHOLD = 10; public Follower(RaftActorContext context) { super(context, RaftState.Follower); - initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); + initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); if(context.getRaftPolicy().automaticElectionsEnabled()) { if (context.getPeerAddresses().isEmpty()) { @@ -395,36 +396,4 @@ public class Follower extends AbstractRaftActorBehavior { SnapshotTracker getSnapshotTracker(){ return snapshotTracker; } - - private class InitialSyncStatusTracker { - - private static final long INVALID_LOG_INDEX = -2L; - private long initialLeaderCommit = INVALID_LOG_INDEX; - private boolean initialSyncUpDone = false; - private String syncedLeaderId = null; - private final ActorRef actor; - - public InitialSyncStatusTracker(ActorRef actor) { - this.actor = actor; - } - - public void update(String leaderId, long leaderCommit, long commitIndex){ - - if(!leaderId.equals(syncedLeaderId)){ - initialSyncUpDone = false; - initialLeaderCommit = INVALID_LOG_INDEX; - syncedLeaderId = leaderId; - } - - if(!initialSyncUpDone){ - if(initialLeaderCommit == INVALID_LOG_INDEX){ - actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender()); - initialLeaderCommit = leaderCommit; - } else if(commitIndex >= initialLeaderCommit){ - actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender()); - initialSyncUpDone = true; - } - } - } - } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java new file mode 100644 index 0000000000..85622a5908 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; + +/** + * The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not + * When an update is received from the Leader and the update happens to be the first update + * from that Leader then the SyncStatusTracker will not mark the Follower as not in-sync till the + * Followers commitIndex matches the commitIndex that the Leader sent in it's very first update. + * Subsequently when an update is received the tracker will consider the Follower to be out of + * sync if it is behind by 'syncThreshold' commits. + */ +public class SyncStatusTracker { + + private static final boolean IN_SYNC = true; + private static final boolean NOT_IN_SYNC = false; + private static final boolean FORCE_STATUS_CHANGE = true; + + private final String id; + private String syncedLeaderId = null; + private final ActorRef actor; + private final int syncThreshold; + private boolean syncStatus = false; + private long minimumExpectedIndex = -2L; + + public SyncStatusTracker(ActorRef actor, String id, int syncThreshold) { + this.actor = Preconditions.checkNotNull(actor, "actor should not be null"); + this.id = Preconditions.checkNotNull(id, "id should not be null"); + Preconditions.checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0"); + this.syncThreshold = syncThreshold; + } + + public void update(String leaderId, long leaderCommit, long commitIndex){ + leaderId = Preconditions.checkNotNull(leaderId, "leaderId should not be null"); + + if(!leaderId.equals(syncedLeaderId)){ + minimumExpectedIndex = leaderCommit; + changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE); + syncedLeaderId = leaderId; + return; + } + + if((leaderCommit - commitIndex) > syncThreshold){ + changeSyncStatus(NOT_IN_SYNC); + } else if((leaderCommit - commitIndex) <= syncThreshold && commitIndex >= minimumExpectedIndex) { + changeSyncStatus(IN_SYNC); + } + } + + private void changeSyncStatus(boolean newSyncStatus){ + changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE); + } + + private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange){ + if(syncStatus == newSyncStatus && !forceStatusChange){ + return; + } + actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender()); + syncStatus = newSyncStatus; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java new file mode 100644 index 0000000000..3ffaeb811c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTrackerTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + + +package org.opendaylight.controller.cluster.raft.behaviors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; + +public class SyncStatusTrackerTest extends AbstractActorTest { + protected final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + private final TestActorRef listener = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("listener")); + + @After + public void tearDown(){ + actorFactory.close(); + } + + @Test + public void testUpdate() throws Exception { + SyncStatusTracker tracker = new SyncStatusTracker(listener, "commit-tracker", 10); + + // When leader-1 sends the first update message the listener should receive a syncStatus notification + // with status set to false + tracker.update("leader-1", 100, 99); + FollowerInitialSyncUpStatus status = + MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertEquals(false, status.isInitialSyncDone()); + MessageCollectorActor.clearMessages(listener); + + // At a minimum the follower should have the commit index that the new leader sent it in the first message + // Also the commit index must be below the syncThreshold. If both conditions are met a new sync status + // message with status = true should be expected + tracker.update("leader-1", 105, 101); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertEquals(true, status.isInitialSyncDone()); + MessageCollectorActor.clearMessages(listener); + + // If a subsequent message is received and if the difference between the followers commit index and + // the leaders commit index is below the syncThreshold then no status notification must be issues + tracker.update("leader-1", 108, 101); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertNull("No status message should be received", status); + + // If the follower falls behind the leader by more than the syncThreshold then the listener should + // receive a syncStatus notification with status = false + tracker.update("leader-1", 150, 101); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertNotNull("No sync status message was received", status); + + assertEquals(false, status.isInitialSyncDone()); + MessageCollectorActor.clearMessages(listener); + + // If the follower is not caught up yet it should not receive any further notification + tracker.update("leader-1", 150, 125); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertNull("No status message should be received", status); + + // Once the syncThreshold is met a new syncStatus notification should be issued + tracker.update("leader-1", 160, 155); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertEquals(true, status.isInitialSyncDone()); + MessageCollectorActor.clearMessages(listener); + + // When a new leader starts sending update messages a new syncStatus notification should be immediately + // triggered with status = false + tracker.update("leader-2", 160, 155); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertEquals(false, status.isInitialSyncDone()); + MessageCollectorActor.clearMessages(listener); + + // If an update is received from a new leader which is still below the minimum expected index then + // syncStatus should not be changed + tracker.update("leader-2", 160, 159); + + status = MessageCollectorActor.getFirstMatching(listener, FollowerInitialSyncUpStatus.class); + + assertNull("No status message should be received", status); + + } + + @Test + public void testConstructorActorShouldNotBeNull(){ + try { + new SyncStatusTracker(null, "commit-tracker", 10); + fail("A NullPointerException was expected"); + } catch(NullPointerException e){ + assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("actor ")); + } + } + + @Test + public void testConstructorIdShouldNotBeNull(){ + try { + new SyncStatusTracker(listener, null, 10); + fail("A NullPointerException was expected"); + } catch(NullPointerException e){ + assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("id ")); + } + } + + @Test + public void testConstructorSyncThresholdShouldNotBeNegative(){ + try { + new SyncStatusTracker(listener, "commit-tracker", -1); + fail("An IllegalArgumentException was expected"); + } catch(IllegalArgumentException e){ + assertTrue("Invalid error message :" + e.getMessage(), e.getMessage().contains("syncThreshold ")); + } + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e05dd7c09b..5f59672ed9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -622,7 +622,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } - mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, datastoreContext.getDataStoreMXBeanType(), localShardActorNames); mBean.setShardManager(this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java index aaac644b55..5de46cb87b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java @@ -29,20 +29,22 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo private static final Logger LOG = LoggerFactory.getLogger(ShardManagerInfo.class); + private final String memberName; private final List localShards; private boolean syncStatus = false; private ShardManager shardManager; - public ShardManagerInfo(String name, String mxBeanType, List localShards) { + public ShardManagerInfo(String memberName, String name, String mxBeanType, List localShards) { super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER); + this.memberName = memberName; this.localShards = localShards; } - public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType, + public static ShardManagerInfo createShardManagerMBean(String memberName, String name, String mxBeanType, List localShards){ - ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards); + ShardManagerInfo shardManagerInfo = new ShardManagerInfo(memberName, name, mxBeanType, localShards); shardManagerInfo.registerMBean(); @@ -59,6 +61,11 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo return this.syncStatus; } + @Override + public String getMemberName() { + return memberName; + } + @Override public void switchAllLocalShardsState(String newState, long term) { LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java index da0331eb77..92a43d28f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java @@ -23,6 +23,13 @@ public interface ShardManagerInfoMBean { */ boolean getSyncStatus(); + /** + * Get the name of of the current member + * + * @return + */ + String getMemberName(); + /** * Switch the Raft Behavior of all the local shards to the newBehavior *