Bug 7391: Fix out-of-order LeaderStateChange events 01/49701/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 21 Dec 2016 14:20:36 +0000 (09:20 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 22 Dec 2016 13:28:47 +0000 (13:28 +0000)
On leader transition, the current leader first sends out
LeaderTransitioning events to each follower to tell them the
current leader is being deposed. The followers send out a
LeaderStateChange event with a null leaderId which is picked
up by the ShardManager to delay subsequent transactions activity
to the shard until a new leader is elected. However it's possible
the LeaderStateChange message does not reach a follower until after
the leader transition occurs (eg due to dispatching delay in the
caller or the network). This results in a LeaderStateChange event
with a null leaderId being delivered after the LeaderStateChange
with the new leaderId. I wrote a unit test that reproduces it.

We need to handle LeaderTransitioning events in a CAS-like manner,
ie include the leaderId with the LeaderTransitioning message and
only issue the LeaderStateChange event with a null leaderId if the
current leaderId matches the leaderId in the LeaderTransitioning
message.

Change-Id: I24e8bbf7707858ac4ed62f3a979cc0403daff8ac
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java

index 1c08b20f2df27842c5050a6d7bda9c83e9370a91..fff6ce9ed17db138af3ec73be83ddcd4d324f102 100644 (file)
@@ -278,7 +278,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SwitchBehavior) {
             switchBehavior((SwitchBehavior) message);
         } else if (message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
+            onLeaderTransitioning((LeaderTransitioning)message);
         } else if (message instanceof Shutdown) {
             onShutDown();
         } else if (message instanceof Runnable) {
@@ -376,10 +376,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
index b087914deccf23837137ba36847d269b4bcfc7aa..32790d0f47251af840193ebfe77fa51c4702ffd1 100644 (file)
@@ -79,7 +79,7 @@ public class RaftActorLeadershipTransferCohort {
         for (String peerId: context.getPeerIds()) {
             ActorSelection followerActor = context.getPeerActorSelection(peerId);
             if (followerActor != null) {
-                followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
+                followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
             }
         }
 
index 32e24b704f358309887d931436bf75e8dfff2e96..fd2fb686773d6caa292a352091ad9ab28f5c7eaf 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import com.google.common.base.Preconditions;
 import java.io.Serializable;
+import javax.annotation.Nonnull;
 
 /**
  * Message sent from a leader to its followers to indicate leadership transfer is starting.
@@ -16,13 +18,20 @@ import java.io.Serializable;
  */
 public final class LeaderTransitioning implements Serializable {
     private static final long serialVersionUID = 1L;
-    public static final LeaderTransitioning INSTANCE = new LeaderTransitioning();
 
-    private LeaderTransitioning() {
-        // Hidden on purpose
+    private final String leaderId;
+
+    public LeaderTransitioning(@Nonnull final String leaderId) {
+        this.leaderId = Preconditions.checkNotNull(leaderId);
+    }
+
+    @Nonnull
+    public String getLeaderId() {
+        return leaderId;
     }
 
-    private Object readResolve() {
-        return INSTANCE;
+    @Override
+    public String toString() {
+        return "LeaderTransitioning [leaderId=" + leaderId + "]";
     }
 }
index 76ef5f6228a37509a067f67c0e5858ca8d5191c2..83f5513d7e9dd37516c5285be0dedea4a2f9eb1a 100644 (file)
@@ -8,19 +8,23 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -79,23 +83,37 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         clearMessages(follower2NotifierActor);
         clearMessages(follower3NotifierActor);
 
+        // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
+        final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
+        follower2Instance.startDropMessages(LeaderTransitioning.class);
+
         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
         final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
 
-        assertNullLeaderIdChange(leaderNotifierActor);
-        assertNullLeaderIdChange(follower1NotifierActor);
-        assertNullLeaderIdChange(follower2NotifierActor);
-        assertNullLeaderIdChange(follower3NotifierActor);
-
         verifyRaftState(follower1Actor, RaftState.Leader);
 
         Boolean stopped = Await.result(stopFuture, duration);
         assertEquals("Stopped", Boolean.TRUE, stopped);
 
-        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        // Re-enable LeaderTransitioning messages to follower2.
+        final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
+                LeaderTransitioning.class);
+        follower2Instance.stopDropMessages(LeaderTransitioning.class);
+
+        follower2Instance.stopDropMessages(AppendEntries.class);
         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
 
+        // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
+        follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
+
+        verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
+        verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
+        // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
+        // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
+        verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
+        verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
+
         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
     }
 
@@ -166,9 +184,16 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
     }
 
-    private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
-        LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
-        assertNull("Expected null leader Id", change.getLeaderId());
+    private void verifyLeaderStateChangedMessages(TestActorRef<MessageCollectorActor> notifierActor,
+            String... expLeaderIds) {
+        List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
+                expLeaderIds.length);
+
+        Collections.reverse(leaderStateChanges);
+        Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
+        for (int i = expLeaderIds.length - 1; i >= 0; i--) {
+            assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
+        }
     }
 
     @Test
index 8ec50754e8ec35a83d27255936178d66462e2db4..9e354612616b7e39235e7335a7fe618ea7973243 100644 (file)
@@ -1259,7 +1259,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         MessageCollectorActor.clearMessages(notifierActor);
 
-        raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
+        raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
 
         leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
         assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());