BUG 2185 : Introduce the SwitchBehavior message 59/24459/5
authorMoiz Raja <moraja@cisco.com>
Thu, 23 Jul 2015 03:34:30 +0000 (20:34 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 11 Aug 2015 17:21:06 +0000 (17:21 +0000)
RaftActor processes the SwitchBehavior message to change the behavior
Candidate and IsolatedLeader behaviors are not allowed.

Change-Id: Id8d758c6574a5c58927927b83bc5985081b19c50
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java [new file with mode: 0644]

index ebc157bc1729aff08199b5d6fe4a17b04afdc552..7bf76a70f21139e2a8c1087bd66b8463225320ff 100644 (file)
@@ -16,6 +16,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collection;
@@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -114,6 +116,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
+    private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
+
     public RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
@@ -190,7 +194,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @Override
-    public void handleCommand(Object message) {
+    public void handleCommand(final Object message) {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -234,15 +238,33 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             );
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
+        } else if(message instanceof SwitchBehavior){
+            switchBehavior(((SwitchBehavior) message));
         } else if(!snapshotSupport.handleSnapshotMessage(message)) {
-            reusableBehaviorStateHolder.init(getCurrentBehavior());
-
-            setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
+            switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+        }
+    }
 
-            handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+    private void switchBehavior(SwitchBehavior message) {
+        if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+            RaftState newState = message.getNewState();
+            if( newState == RaftState.Leader || newState == RaftState.Follower) {
+                switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+                getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
+            } else {
+                LOG.warn("Switching to behavior : {} - not supported", newState);
+            }
         }
     }
 
+    private void switchBehavior(Supplier<RaftActorBehavior> supplier){
+        reusableBehaviorStateHolder.init(getCurrentBehavior());
+
+        setCurrentBehavior(supplier.get());
+
+        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+    }
+
     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
         return new RaftActorSnapshotMessageSupport(context, currentBehavior,
                 getRaftActorSnapshotCohort());
@@ -655,4 +677,23 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             return leaderPayloadVersion;
         }
     }
+
+    private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
+        private Object message;
+        private ActorRef sender;
+
+        public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
+            this.sender = sender;
+            this.message = message;
+            return this;
+        }
+
+        @Override
+        public RaftActorBehavior get() {
+            if(this.message instanceof SwitchBehavior){
+                return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
+            }
+            return currentBehavior.handleMessage(sender, message);
+        }
+    }
 }
index 216ad4103dc4409182e0c590e63afdd2b34e2aa0..4f694fcfcd7a0cf1494e082fc9e0ada278125ea9 100644 (file)
@@ -1,8 +1,36 @@
 package org.opendaylight.controller.cluster.raft;
 
+import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.IsolatedLeader;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
 public enum RaftState {
-    Candidate,
-    Follower,
-    Leader,
-    IsolatedLeader;
+    Candidate {
+        @Override
+        public RaftActorBehavior createBehavior(RaftActorContext context) {
+            return new Candidate(context);
+        }
+    },
+    Follower {
+        @Override
+        public RaftActorBehavior createBehavior(RaftActorContext context) {
+            return new Follower(context);
+        }
+    },
+    Leader {
+        @Override
+        public RaftActorBehavior createBehavior(RaftActorContext context) {
+            return new Leader(context);
+        }
+    },
+    IsolatedLeader {
+        @Override
+        public RaftActorBehavior createBehavior(RaftActorContext context) {
+            return new IsolatedLeader(context);
+        }
+    };
+
+    public abstract RaftActorBehavior createBehavior(RaftActorContext context);
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java
new file mode 100644 (file)
index 0000000..b3c5461
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+public class SwitchBehavior {
+    private final RaftState newState;
+    private final long newTerm;
+
+    public SwitchBehavior(RaftState newState, long newTerm) {
+        this.newState = newState;
+        this.newTerm = newTerm;
+    }
+
+    public RaftState getNewState() {
+        return newState;
+    }
+
+    public long getNewTerm() {
+        return newTerm;
+    }
+}
index a1bc3eea65a3003ff8d4ef38aa3a3cbbc44519a6..8692e9948a4def892062a0b21d6129cd0f29c35c 100644 (file)
@@ -418,7 +418,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         this.leaderPayloadVersion = leaderPayloadVersion;
     }
 
-    protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+    @Override
+    public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
         try {
             close();
index 64bdc4a504efd8f78f56bdb5debd3c1a2e251178..175e16f21b202aa23181cb7c34dd7ceb02dab700 100644 (file)
@@ -60,4 +60,9 @@ public class DelegatingRaftActorBehavior implements RaftActorBehavior {
     public short getLeaderPayloadVersion() {
         return delegate.getLeaderPayloadVersion();
     }
+
+    @Override
+    public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+        return delegate.switchBehavior(behavior);
+    }
 }
index d04dec030bbdcf65176e3f299b9da32597cc2ff8..b0a7638b92abdec6aef770cc6170a5c5353cc434 100644 (file)
@@ -65,4 +65,13 @@ public interface RaftActorBehavior extends AutoCloseable{
      * @return the leader's payload data version.
      */
     short getLeaderPayloadVersion();
+
+    /**
+     * switchBehavior makes sure that the current behavior is shutdown before it switches to the new
+     * behavior
+     *
+     * @param behavior The new behavior to switch to
+     * @return The new behavior
+     */
+    RaftActorBehavior switchBehavior(RaftActorBehavior behavior);
 }
index 3275737cf71b63a0e74c1eb4321ac6311e242f84..0a8e3879bdca0a92472c36a8376414a9c056333a 100644 (file)
@@ -17,6 +17,7 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -51,6 +52,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -673,7 +675,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
@@ -701,7 +703,7 @@ public class RaftActorTest extends AbstractActorTest {
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
@@ -759,7 +761,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
@@ -792,7 +794,7 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
@@ -889,6 +891,50 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testSwitchBehavior(){
+        String persistenceId = factory.generateActorId("leader-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setSnapshotBatchCount(5);
+
+        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+
+        Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
+
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+                MockRaftActor.props(persistenceId, peerAddresses,
+                        Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+        MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
+        leaderActor.handleRecover(RecoveryCompleted.getInstance());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
+
+        assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+
+    }
+
     public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java
new file mode 100644 (file)
index 0000000..c9d9653
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+/**
+ * DisableElectionsRaftPolicy can be useful for testing purposes where we may want to disable
+ * elections so that the Leaders for a RaftActor can be set externally. Modification to state would
+ * still require consensus.
+ */
+public class DisableElectionsRaftPolicy implements RaftPolicy {
+    @Override
+    public boolean automaticElectionsEnabled() {
+        return false;
+    }
+
+    @Override
+    public boolean applyModificationToStateBeforeConsensus() {
+        return false;
+    }
+}