Add LeaderTransitioning message to RaftActor 07/31507/3
authorTom Pantelis <tpanteli@brocade.com>
Tue, 15 Dec 2015 22:23:01 +0000 (17:23 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 21 Dec 2015 11:31:58 +0000 (06:31 -0500)
Adds a LeaderTransitioning message that sends a LeaderStateChanged
message to the RoleChangeNotifier actor with a null leaderId. This will
be used in subsequent patches for leadership transfer. This message will
be sent from the leader to its followers. The resulting
LeaderStateChanged message causes the ShardManager to clear its cached
leader info for the shard so subsequent transactions will wait for the new
leader to come on line.

Change-Id: I2a91374992687a478af1c76b74128e7b0f813c45
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java

index 2caba55a23063fea7214d8e4bf7505afaafeac1d..3afaad857204b75ebb1fe36ca9383d0fe9a0e76d 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
@@ -250,11 +251,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
+        } else if(message instanceof LeaderTransitioning) {
+            onLeaderTransitioning();
         } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
 
         } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
 
+    private void onLeaderTransitioning() {
+        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+        Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+        if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+            roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+                    currentBehavior.getLeaderPayloadVersion()), getSelf());
+        }
+    }
+
     private void switchBehavior(SwitchBehavior message) {
         if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
     private void switchBehavior(SwitchBehavior message) {
         if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/LeaderTransitioning.java
new file mode 100644 (file)
index 0000000..4974f8f
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from a leader to its followers to indicate leadership transfer is starting.
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderTransitioning implements Serializable {
+    private static final long serialVersionUID = 1L;
+}
index e19135cbcb0710f676470da9af0ded40565a48ca..f4846d2589ad050632b47bc24c15b451e818b218 100644 (file)
@@ -71,6 +71,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
@@ -1332,4 +1333,39 @@ public class RaftActorTest extends AbstractActorTest {
 
         TEST_LOG.info("testNonVotingOnRecovery ending");
     }
 
         TEST_LOG.info("testNonVotingOnRecovery ending");
     }
+
+    @Test
+    public void testLeaderTransitioning() throws Exception {
+        TEST_LOG.info("testLeaderTransitioning starting");
+
+        TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
+                0L, -1L, (short)1), ActorRef.noSender());
+        LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                notifierActor, LeaderStateChanged.class);
+        assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+
+        MessageCollectorActor.clearMessages(notifierActor);
+
+        raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
+
+        leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
+        assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+
+        TEST_LOG.info("testLeaderTransitioning ending");
+    }
 }
 }