From 26c4b15f6b55f012e7e7cf6e94d335386615af5c Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Wed, 22 Jul 2015 20:34:30 -0700 Subject: [PATCH] BUG 2185 : Introduce the SwitchBehavior message RaftActor processes the SwitchBehavior message to change the behavior Candidate and IsolatedLeader behaviors are not allowed. Change-Id: Id8d758c6574a5c58927927b83bc5985081b19c50 Signed-off-by: Moiz Raja --- .../controller/cluster/raft/RaftActor.java | 51 ++++++++++++++++-- .../controller/cluster/raft/RaftState.java | 36 +++++++++++-- .../raft/base/messages/SwitchBehavior.java | 29 ++++++++++ .../behaviors/AbstractRaftActorBehavior.java | 3 +- .../DelegatingRaftActorBehavior.java | 5 ++ .../raft/behaviors/RaftActorBehavior.java | 9 ++++ .../cluster/raft/RaftActorTest.java | 54 +++++++++++++++++-- .../policy/DisableElectionsRaftPolicy.java | 26 +++++++++ 8 files changed, 199 insertions(+), 14 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index ebc157bc17..7bf76a70f2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -16,6 +16,7 @@ import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; @@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -114,6 +116,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); + private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier(); + public RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { @@ -190,7 +194,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - public void handleCommand(Object message) { + public void handleCommand(final Object message) { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -234,15 +238,33 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { ); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); + } else if(message instanceof SwitchBehavior){ + switchBehavior(((SwitchBehavior) message)); } else if(!snapshotSupport.handleSnapshotMessage(message)) { - reusableBehaviorStateHolder.init(getCurrentBehavior()); - - setCurrentBehavior(currentBehavior.handleMessage(getSender(), message)); + switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + } + } - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + private void switchBehavior(SwitchBehavior message) { + if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { + RaftState newState = message.getNewState(); + if( newState == RaftState.Leader || newState == RaftState.Follower) { + switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); + } else { + LOG.warn("Switching to behavior : {} - not supported", newState); + } } } + private void switchBehavior(Supplier supplier){ + reusableBehaviorStateHolder.init(getCurrentBehavior()); + + setCurrentBehavior(supplier.get()); + + handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + } + protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { return new RaftActorSnapshotMessageSupport(context, currentBehavior, getRaftActorSnapshotCohort()); @@ -655,4 +677,23 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return leaderPayloadVersion; } } + + private class SwitchBehaviorSupplier implements Supplier { + private Object message; + private ActorRef sender; + + public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){ + this.sender = sender; + this.message = message; + return this; + } + + @Override + public RaftActorBehavior get() { + if(this.message instanceof SwitchBehavior){ + return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext()); + } + return currentBehavior.handleMessage(sender, message); + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java index 216ad4103d..4f694fcfcd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java @@ -1,8 +1,36 @@ package org.opendaylight.controller.cluster.raft; +import org.opendaylight.controller.cluster.raft.behaviors.Candidate; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.IsolatedLeader; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + public enum RaftState { - Candidate, - Follower, - Leader, - IsolatedLeader; + Candidate { + @Override + public RaftActorBehavior createBehavior(RaftActorContext context) { + return new Candidate(context); + } + }, + Follower { + @Override + public RaftActorBehavior createBehavior(RaftActorContext context) { + return new Follower(context); + } + }, + Leader { + @Override + public RaftActorBehavior createBehavior(RaftActorContext context) { + return new Leader(context); + } + }, + IsolatedLeader { + @Override + public RaftActorBehavior createBehavior(RaftActorContext context) { + return new IsolatedLeader(context); + } + }; + + public abstract RaftActorBehavior createBehavior(RaftActorContext context); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java new file mode 100644 index 0000000000..b3c5461567 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SwitchBehavior.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.base.messages; + +import org.opendaylight.controller.cluster.raft.RaftState; + +public class SwitchBehavior { + private final RaftState newState; + private final long newTerm; + + public SwitchBehavior(RaftState newState, long newTerm) { + this.newState = newState; + this.newTerm = newTerm; + } + + public RaftState getNewState() { + return newState; + } + + public long getNewTerm() { + return newTerm; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index a1bc3eea65..8692e9948a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -418,7 +418,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { this.leaderPayloadVersion = leaderPayloadVersion; } - protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + @Override + public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state()); try { close(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java index 64bdc4a504..175e16f21b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java @@ -60,4 +60,9 @@ public class DelegatingRaftActorBehavior implements RaftActorBehavior { public short getLeaderPayloadVersion() { return delegate.getLeaderPayloadVersion(); } + + @Override + public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { + return delegate.switchBehavior(behavior); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index d04dec030b..b0a7638b92 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -65,4 +65,13 @@ public interface RaftActorBehavior extends AutoCloseable{ * @return the leader's payload data version. */ short getLeaderPayloadVersion(); + + /** + * switchBehavior makes sure that the current behavior is shutdown before it switches to the new + * behavior + * + * @param behavior The new behavior to switch to + * @return The new behavior + */ + RaftActorBehavior switchBehavior(RaftActorBehavior behavior); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 3275737cf7..0a8e3879bd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -17,6 +17,7 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.japi.Procedure; +import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; @@ -51,6 +52,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -673,7 +675,7 @@ public class RaftActorTest extends AbstractActorTest { (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7, new MockRaftActorContext.MockPayload("foo-7")) ); - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0)); assertEquals(8, followerActor.getReplicatedLog().size()); assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state()); @@ -701,7 +703,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-7")) ); // send an additional entry 8 with leaderCommit = 7 - followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0)); + followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0)); // 7 and 8, as lastapplied is 7 assertEquals(2, followerActor.getReplicatedLog().size()); @@ -759,7 +761,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0)); assertEquals(5, leaderActor.getReplicatedLog().size()); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); @@ -792,7 +794,7 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); //reply from a slow follower after should not raise errors - leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0)); + leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0)); assertEquals(0, leaderActor.getReplicatedLog().size()); } }; @@ -889,6 +891,50 @@ public class RaftActorTest extends AbstractActorTest { }}; } + @Test + public void testSwitchBehavior(){ + String persistenceId = factory.generateActorId("leader-"); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setSnapshotBatchCount(5); + + DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider(); + + Map peerAddresses = ImmutableMap.builder().build(); + + TestActorRef mockActorRef = factory.createTestActor( + MockRaftActor.props(persistenceId, peerAddresses, + Optional.of(config), dataPersistenceProvider), persistenceId); + + MockRaftActor leaderActor = mockActorRef.underlyingActor(); + + leaderActor.handleRecover(RecoveryCompleted.getInstance()); + + leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100)); + + assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state()); + + leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110)); + + assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125)); + + assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125)); + + assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm()); + assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); + + + } + public static ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java new file mode 100644 index 0000000000..c9d965394a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/policy/DisableElectionsRaftPolicy.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.policy; + +/** + * DisableElectionsRaftPolicy can be useful for testing purposes where we may want to disable + * elections so that the Leaders for a RaftActor can be set externally. Modification to state would + * still require consensus. + */ +public class DisableElectionsRaftPolicy implements RaftPolicy { + @Override + public boolean automaticElectionsEnabled() { + return false; + } + + @Override + public boolean applyModificationToStateBeforeConsensus() { + return false; + } +} -- 2.36.6