From: Tom Pantelis Date: Wed, 21 Dec 2016 14:20:36 +0000 (-0500) Subject: Bug 7391: Fix out-of-order LeaderStateChange events X-Git-Tag: release/carbon~343 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=000960f6451af770f5463e41e1fb6defb6f3ab27 Bug 7391: Fix out-of-order LeaderStateChange events On leader transition, the current leader first sends out LeaderTransitioning events to each follower to tell them the current leader is being deposed. The followers send out a LeaderStateChange event with a null leaderId which is picked up by the ShardManager to delay subsequent transactions activity to the shard until a new leader is elected. However it's possible the LeaderStateChange message does not reach a follower until after the leader transition occurs (eg due to dispatching delay in the caller or the network). This results in a LeaderStateChange event with a null leaderId being delivered after the LeaderStateChange with the new leaderId. I wrote a unit test that reproduces it. We need to handle LeaderTransitioning events in a CAS-like manner, ie include the leaderId with the LeaderTransitioning message and only issue the LeaderStateChange event with a null leaderId if the current leaderId matches the leaderId in the LeaderTransitioning message. Change-Id: I24e8bbf7707858ac4ed62f3a979cc0403daff8ac Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c08b20f2d..fff6ce9ed1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -278,7 +278,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning(); + onLeaderTransitioning((LeaderTransitioning)message); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof Runnable) { @@ -376,10 +376,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void onLeaderTransitioning() { - LOG.debug("{}: onLeaderTransitioning", persistenceId()); + private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) { + LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning); Optional roleChangeNotifier = getRoleChangeNotifier(); - if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() + && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index b087914dec..32790d0f47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -79,7 +79,7 @@ public class RaftActorLeadershipTransferCohort { for (String peerId: context.getPeerIds()) { ActorSelection followerActor = context.getPeerActorSelection(peerId); if (followerActor != null) { - followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor()); + followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java index 32e24b704f..fd2fb68677 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java @@ -7,7 +7,9 @@ */ package org.opendaylight.controller.cluster.raft.base.messages; +import com.google.common.base.Preconditions; import java.io.Serializable; +import javax.annotation.Nonnull; /** * Message sent from a leader to its followers to indicate leadership transfer is starting. @@ -16,13 +18,20 @@ import java.io.Serializable; */ public final class LeaderTransitioning implements Serializable { private static final long serialVersionUID = 1L; - public static final LeaderTransitioning INSTANCE = new LeaderTransitioning(); - private LeaderTransitioning() { - // Hidden on purpose + private final String leaderId; + + public LeaderTransitioning(@Nonnull final String leaderId) { + this.leaderId = Preconditions.checkNotNull(leaderId); + } + + @Nonnull + public String getLeaderId() { + return leaderId; } - private Object readResolve() { - return INSTANCE; + @Override + public String toString() { + return "LeaderTransitioning [leaderId=" + leaderId + "]"; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java index 76ef5f6228..83f5513d7e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java @@ -8,19 +8,23 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; import akka.actor.ActorRef; import akka.actor.Props; import akka.pattern.Patterns; import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -79,23 +83,37 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat clearMessages(follower2NotifierActor); clearMessages(follower3NotifierActor); + // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id. + final TestRaftActor follower2Instance = follower2Actor.underlyingActor(); + follower2Instance.startDropMessages(LeaderTransitioning.class); + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); final Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); - assertNullLeaderIdChange(leaderNotifierActor); - assertNullLeaderIdChange(follower1NotifierActor); - assertNullLeaderIdChange(follower2NotifierActor); - assertNullLeaderIdChange(follower3NotifierActor); - verifyRaftState(follower1Actor, RaftState.Leader); Boolean stopped = Await.result(stopFuture, duration); assertEquals("Stopped", Boolean.TRUE, stopped); - follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); + // Re-enable LeaderTransitioning messages to follower2. + final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor, + LeaderTransitioning.class); + follower2Instance.stopDropMessages(LeaderTransitioning.class); + + follower2Instance.stopDropMessages(AppendEntries.class); ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class); assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex()); + // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader. + follower2Actor.tell(leaderTransitioning, ActorRef.noSender()); + + verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id); + verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id); + // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message + // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set. + verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id); + verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id); + testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending"); } @@ -166,9 +184,16 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState())); } - private static void assertNullLeaderIdChange(TestActorRef notifierActor) { - LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class); - assertNull("Expected null leader Id", change.getLeaderId()); + private void verifyLeaderStateChangedMessages(TestActorRef notifierActor, + String... expLeaderIds) { + List leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class, + expLeaderIds.length); + + Collections.reverse(leaderStateChanges); + Iterator actual = leaderStateChanges.iterator(); + for (int i = expLeaderIds.length - 1; i >= 0; i--) { + assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId()); + } } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 8ec50754e8..9e35461261 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1259,7 +1259,7 @@ public class RaftActorTest extends AbstractActorTest { MessageCollectorActor.clearMessages(notifierActor); - raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender()); + raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender()); leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());