import com.google.common.base.Supplier;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
/**
* @return the payload version to be used when replicating data
*/
short getPayloadVersion();
+
+ /**
+ * @return an implementation of the RaftPolicy so that the Raft code can be adapted
+ */
+ RaftPolicy getRaftPolicy();
}
import com.google.common.base.Supplier;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.slf4j.Logger;
public class RaftActorContextImpl implements RaftActorContext {
public DataPersistenceProvider getPersistenceProvider() {
return persistenceProvider;
}
+
+
+ @Override
+ public RaftPolicy getRaftPolicy() {
+ return DefaultRaftPolicy.INSTANCE;
+ }
}
logIndex)
);
- if (followerToLog.isEmpty()) {
+ boolean applyModificationToState = followerToLog.isEmpty()
+ || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+ if(applyModificationToState){
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
- } else {
+ }
+
+ if (!followerToLog.isEmpty()) {
sendAppendEntries(0, false);
}
}
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
- if(context.getPeerAddresses().isEmpty()){
- actor().tell(ELECTION_TIMEOUT, actor());
- } else {
- scheduleElection(electionDuration());
+ if(context.getRaftPolicy().automaticElectionsEnabled()) {
+ if (context.getPeerAddresses().isEmpty()) {
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+public class DefaultRaftPolicy implements RaftPolicy {
+
+ public static final RaftPolicy INSTANCE = new DefaultRaftPolicy();
+
+ @Override
+ public boolean automaticElectionsEnabled() {
+ return true;
+ }
+
+ @Override
+ public boolean applyModificationToStateBeforeConsensus() {
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+/**
+ * The RaftPolicy is intended to change the default behavior of Raft. For example
+ * we may want to be able to determine which Raft replica should become the leader - with Raft elections are
+ * randomized so it is not possible to specify which replica should be the leader. The ability to specify
+ * the leader would be quite useful when testing a raft cluster.
+ *
+ * Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification
+ * is only applied to the state when the modification is replicated to a majority of the replicas. The ability to
+ * apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes
+ * in a Raft cluster and one of them is down but you still want the RaftActor to apply a modification to the state.
+ *
+ */
+public interface RaftPolicy {
+ /**
+ * According to Raft a Follower which does not receive a heartbeat (aka AppendEntries) in a given period should
+ * become a Candidate and trigger an election.
+ *
+ * @return true to enable automatic Raft elections, false to disable them
+ */
+ boolean automaticElectionsEnabled();
+
+ /**
+ * According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a
+ * majority of it's followers
+ *
+ * @return true if modification should be applied before consensus, false to apply modification to state
+ * as per Raft
+ */
+ boolean applyModificationToStateBeforeConsensus();
+}
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
private SnapshotManager snapshotManager;
private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
private short payloadVersion;
+ private RaftPolicy raftPolicy = DefaultRaftPolicy.INSTANCE;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
return payloadVersion;
}
+ @Override
+ public RaftPolicy getRaftPolicy() {
+ return this.raftPolicy;
+ }
+
+ public void setRaftPolicy(RaftPolicy raftPolicy){
+ this.raftPolicy = raftPolicy;
+ }
+
public void setPayloadVersion(short payloadVersion) {
this.payloadVersion = payloadVersion;
}
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
protected void logStart(String name) {
LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
}
+
+ protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
+ final boolean applyModificationToStateBeforeConsensus){
+ return new RaftPolicy() {
+ @Override
+ public boolean automaticElectionsEnabled() {
+ return automaticElectionsEnabled;
+ }
+
+ @Override
+ public boolean applyModificationToStateBeforeConsensus() {
+ return applyModificationToStateBeforeConsensus;
+ }
+ };
+ }
}
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
public class FollowerTest extends AbstractRaftActorBehaviorTest {
assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
}
+ @Test
+ public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+ MockRaftActorContext context = createActorContext();
+ context.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ context.setRaftPolicy(createRaftPolicy(false, false));
+
+ follower = createBehavior(context);
+
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
+ }
+
+
public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
int snapshotLength = bs.size();
int start = offset;
assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
}
@Test
return expectFirstMatching(actor, clazz, 5000);
}
+
public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
int count = (int) (timeout / 50);
for(int i = 0; i < count; i++) {
return null;
}
+ public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz) {
+ assertNoneMatching(actor, clazz, 5000);
+ }
+
+ public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz, long timeout) {
+ int count = (int) (timeout / 50);
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = getFirstMatching(actor, clazz);
+ if(message != null) {
+ Assert.fail("Unexpected message received" + message.toString());
+ return;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ return;
+ }
+
+
public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
List<Object> allMessages = getAllMessages(actor);