From: Moiz Raja Date: Wed, 18 Feb 2015 20:33:53 +0000 (-0800) Subject: BUG 2733 : Leader should always schedule heartbeats X-Git-Tag: release/lithium~521 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F76%2F15476%2F5 BUG 2733 : Leader should always schedule heartbeats This ensures that lagging or new followers are not starved as this can potentially destabilize the cluster Change-Id: Ic83d9d595a1222550286f3eb29d4cdef7bf1bccd Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 1552096ef1..be51ba069c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -126,6 +126,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) sendAppendEntries(0, false); + + // It is important to schedule this heartbeat here + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } /** @@ -304,28 +307,26 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - try { - if (message instanceof SendHeartBeat) { - beforeSendHeartbeat(); - sendHeartBeat(); - return this; + if (message instanceof SendHeartBeat) { + beforeSendHeartbeat(); + sendHeartBeat(); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + return this; - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); - } else if (message instanceof Replicate) { - replicate((Replicate) message); + } else if (message instanceof Replicate) { + replicate((Replicate) message); - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply((InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + return super.handleMessage(sender, message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java new file mode 100644 index 0000000000..dd3ed2347c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.behaviors; + +import static org.junit.Assert.assertTrue; +import akka.actor.ActorRef; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; + +public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{ + + /** + * When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where + * if no follower responded to an initial AppendEntries heartbeats would not be sent to it. This test verifies + * that regardless of whether followers respond or not we schedule heartbeats. + * + * @throws Exception + */ + @Test + public void testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries() throws Exception { + logStart("testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries"); + new JavaTestKit(getSystem()) {{ + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + TestActorRef leaderActor = + actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); + ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); + + + leaderActorContext.setPeerAddresses(peerAddresses); + + RaftActorBehavior leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + List allMessages = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + // Need more than 1 heartbeat to be delivered because we waited for 1 second with heartbeat interval 200ms + assertTrue(String.format("%s messages is less than expected", allMessages.size()), + allMessages.size() > 1); + + }}; + } + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java index b37ace9560..e16d765cde 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java @@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { +public class IsolatedLeaderTest extends AbstractLeaderTest { private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); @@ -44,7 +44,7 @@ public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest { @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Leader(actorContext); + return new IsolatedLeader(actorContext); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 34e1ed9b3b..c57fce1cd5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -41,11 +41,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractRaftActorBehaviorTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; @@ -742,7 +743,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, followerActor.path().toString()).build()); return actorContext; } @@ -756,22 +757,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { return context; } - public static class ForwardMessageToBehaviorActor extends MessageCollectorActor { - AbstractRaftActorBehavior behavior; - - @Override public void onReceive(Object message) throws Exception { - if(behavior != null) { - behavior.handleMessage(sender(), message); - } - - super.onReceive(message); - } - - public static Props props() { - return Props.create(ForwardMessageToBehaviorActor.class); - } - } - @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); @@ -781,7 +766,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -834,7 +819,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -871,7 +856,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(2, appendEntriesReply.getLogLastIndex()); assertEquals(1, appendEntriesReply.getLogLastTerm()); - leaderActor.underlyingActor().behavior = leader; + leaderActor.underlyingActor().setBehavior(follower); leader.handleMessage(followerActor, appendEntriesReply); leaderActor.underlyingActor().clear(); @@ -1076,7 +1061,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { followerActorContext.setConfigParams(configParams); Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().behavior = follower; + followerActor.underlyingActor().setBehavior(follower); leaderActorContext.getReplicatedLog().removeFrom(0); leaderActorContext.setCommitIndex(-1); @@ -1134,6 +1119,66 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { follower.close(); } + @Test + public void testLaggingFollowerStarvation() throws Exception { + logStart("testLaggingFollowerStarvation"); + new JavaTestKit(getSystem()) {{ + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + TestActorRef leaderActor = + actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); + ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); + + RaftActorBehavior leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + for(int i=1;i<6;i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); + + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); + + }}; + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java new file mode 100644 index 0000000000..9bcfcd9091 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.utils; + +import akka.actor.Props; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + +public class ForwardMessageToBehaviorActor extends MessageCollectorActor { + private RaftActorBehavior behavior; + + @Override + public void onReceive(Object message) throws Exception { + if(behavior != null) { + behavior.handleMessage(sender(), message); + } + + super.onReceive(message); + } + + public static Props props() { + return Props.create(ForwardMessageToBehaviorActor.class); + } + + public void setBehavior(RaftActorBehavior behavior){ + this.behavior = behavior; + } +} + diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 662c12180d..62f163fb7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.utils; import akka.actor.ActorRef; +import akka.actor.Props; import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; @@ -123,4 +124,8 @@ public class MessageCollectorActor extends UntypedActor { throw new TimeoutException("Actor not ready in time."); } + + public static Props props() { + return Props.create(MessageCollectorActor.class); + } }