From a37aef6c01f720b935535b11cf9d7689ceea9470 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Tue, 21 Jul 2015 16:42:47 -0700 Subject: [PATCH] BUG 2185 : Introduce RaftPolicy & DefaultRaftPolicy These allow for simple customization of the Raft algorithm. The intention is to support the 2-Node clustering scenario in which leaders for a Shard will need to be specified externally and where weak(er) consistency may be considered ok Change-Id: I16bc69a67ac3096082324f11e62565a7b9d7cc57 Signed-off-by: Moiz Raja (cherry picked from commit 0717961816253536d0ca107f4094de7f00c1ee54) --- .../cluster/raft/RaftActorContext.java | 6 +++ .../cluster/raft/RaftActorContextImpl.java | 8 ++++ .../raft/behaviors/AbstractLeader.java | 9 ++++- .../cluster/raft/behaviors/Follower.java | 10 +++-- .../raft/policy/DefaultRaftPolicy.java | 24 +++++++++++ .../cluster/raft/policy/RaftPolicy.java | 40 +++++++++++++++++++ .../cluster/raft/MockRaftActorContext.java | 12 ++++++ .../AbstractRaftActorBehaviorTest.java | 16 ++++++++ .../cluster/raft/behaviors/FollowerTest.java | 20 ++++++++++ .../cluster/raft/behaviors/LeaderTest.java | 39 ++++++++++++++++++ .../raft/utils/MessageCollectorActor.java | 23 +++++++++++ 11 files changed, 201 insertions(+), 6 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 175654a6c3..c4189820b3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; /** @@ -195,4 +196,9 @@ public interface RaftActorContext { * @return the payload version to be used when replicating data */ short getPayloadVersion(); + + /** + * @return an implementation of the RaftPolicy so that the Raft code can be adapted + */ + RaftPolicy getRaftPolicy(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 8fa579a5e7..815c554b9c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -18,6 +18,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.slf4j.Logger; public class RaftActorContextImpl implements RaftActorContext { @@ -202,4 +204,10 @@ public class RaftActorContextImpl implements RaftActorContext { public DataPersistenceProvider getPersistenceProvider() { return persistenceProvider; } + + + @Override + public RaftPolicy getRaftPolicy() { + return DefaultRaftPolicy.INSTANCE; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 94fa507b47..134cde8720 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -451,10 +451,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followerToLog.isEmpty()) { + boolean applyModificationToState = followerToLog.isEmpty() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (!followerToLog.isEmpty()) { sendAppendEntries(0, false); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d16ddabdde..ede0f6af2c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -48,10 +48,12 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); - if(context.getPeerAddresses().isEmpty()){ - actor().tell(ELECTION_TIMEOUT, actor()); - } else { - scheduleElection(electionDuration()); + if(context.getRaftPolicy().automaticElectionsEnabled()) { + if (context.getPeerAddresses().isEmpty()) { + actor().tell(ELECTION_TIMEOUT, actor()); + } else { + scheduleElection(electionDuration()); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java new file mode 100644 index 0000000000..8e71045cbc --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.policy; + +public class DefaultRaftPolicy implements RaftPolicy { + + public static final RaftPolicy INSTANCE = new DefaultRaftPolicy(); + + @Override + public boolean automaticElectionsEnabled() { + return true; + } + + @Override + public boolean applyModificationToStateBeforeConsensus() { + return false; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java new file mode 100644 index 0000000000..a66caa456a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.policy; + +/** + * The RaftPolicy is intended to change the default behavior of Raft. For example + * we may want to be able to determine which Raft replica should become the leader - with Raft elections are + * randomized so it is not possible to specify which replica should be the leader. The ability to specify + * the leader would be quite useful when testing a raft cluster. + * + * Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification + * is only applied to the state when the modification is replicated to a majority of the replicas. The ability to + * apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes + * in a Raft cluster and one of them is down but you still want the RaftActor to apply a modification to the state. + * + */ +public interface RaftPolicy { + /** + * According to Raft a Follower which does not receive a heartbeat (aka AppendEntries) in a given period should + * become a Candidate and trigger an election. + * + * @return true to enable automatic Raft elections, false to disable them + */ + boolean automaticElectionsEnabled(); + + /** + * According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a + * majority of it's followers + * + * @return true if modification should be applied before consensus, false to apply modification to state + * as per Raft + */ + boolean applyModificationToStateBeforeConsensus(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 31a12d5659..1938dab98e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages; @@ -42,6 +44,7 @@ public class MockRaftActorContext implements RaftActorContext { private SnapshotManager snapshotManager; private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider(); private short payloadVersion; + private RaftPolicy raftPolicy = DefaultRaftPolicy.INSTANCE; public MockRaftActorContext(){ electionTerm = new ElectionTerm() { @@ -238,6 +241,15 @@ public class MockRaftActorContext implements RaftActorContext { return payloadVersion; } + @Override + public RaftPolicy getRaftPolicy() { + return this.raftPolicy; + } + + public void setRaftPolicy(RaftPolicy raftPolicy){ + this.raftPolicy = raftPolicy; + } + public void setPayloadVersion(short payloadVersion) { this.payloadVersion = payloadVersion; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index d3f5a0eead..14205aba5c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -15,6 +15,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -349,4 +350,19 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { protected void logStart(String name) { LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name); } + + protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled, + final boolean applyModificationToStateBeforeConsensus){ + return new RaftPolicy() { + @Override + public boolean automaticElectionsEnabled() { + return automaticElectionsEnabled; + } + + @Override + public boolean applyModificationToStateBeforeConsensus() { + return applyModificationToStateBeforeConsensus; + } + }; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index ce1b46e9fd..1e931109c3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; public class FollowerTest extends AbstractRaftActorBehaviorTest { @@ -782,6 +784,24 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); } + @Test + public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){ + MockRaftActorContext context = createActorContext(); + context.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public FiniteDuration getElectionTimeOutInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + context.setRaftPolicy(createRaftPolicy(false, false)); + + follower = createBehavior(context); + + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500); + } + + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 89b83b3b36..ee6e5776e1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -170,6 +170,45 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setRaftPolicy(createRaftPolicy(true, true)); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short) 0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); + + // State should not change + assertTrue(raftBehavior instanceof Leader); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex()); } @Test diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 6ea7a20924..b0773bf7ca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -108,6 +108,7 @@ public class MessageCollectorActor extends UntypedActor { return expectFirstMatching(actor, clazz, 5000); } + public static T expectFirstMatching(ActorRef actor, Class clazz, long timeout) { int count = (int) (timeout / 50); for(int i = 0; i < count; i++) { @@ -147,6 +148,28 @@ public class MessageCollectorActor extends UntypedActor { return null; } + public static void assertNoneMatching(ActorRef actor, Class clazz) { + assertNoneMatching(actor, clazz, 5000); + } + + public static void assertNoneMatching(ActorRef actor, Class clazz, long timeout) { + int count = (int) (timeout / 50); + for(int i = 0; i < count; i++) { + try { + T message = getFirstMatching(actor, clazz); + if(message != null) { + Assert.fail("Unexpected message received" + message.toString()); + return; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + return; + } + + public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { List allMessages = getAllMessages(actor); -- 2.36.6