BUG 2185 : Introduce RaftPolicy & DefaultRaftPolicy 49/24549/1
authorMoiz Raja <moraja@cisco.com>
Tue, 21 Jul 2015 23:42:47 +0000 (16:42 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 24 Jul 2015 21:36:55 +0000 (21:36 +0000)
These allow for simple customization of the Raft algorithm. The intention is
to support the 2-Node clustering scenario in which leaders for a Shard will need
to be specified externally and where weak(er) consistency may be considered ok

Change-Id: I16bc69a67ac3096082324f11e62565a7b9d7cc57
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 0717961816253536d0ca107f4094de7f00c1ee54)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java

index 175654a6c3fe594a4f7fe848aa2768885f4b4b05..c4189820b33bc6b94ae9f05a5b28450d39684079 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import com.google.common.base.Supplier;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
 /**
 import org.slf4j.Logger;
 
 /**
@@ -195,4 +196,9 @@ public interface RaftActorContext {
      * @return the payload version to be used when replicating data
      */
     short getPayloadVersion();
      * @return the payload version to be used when replicating data
      */
     short getPayloadVersion();
+
+    /**
+     * @return an implementation of the RaftPolicy so that the Raft code can be adapted
+     */
+    RaftPolicy getRaftPolicy();
 }
 }
index 8fa579a5e7675e17f287c0d82a36a6567e81f983..815c554b9c9ca80632111a240c845860e04c6f27 100644 (file)
@@ -18,6 +18,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import com.google.common.base.Supplier;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
@@ -202,4 +204,10 @@ public class RaftActorContextImpl implements RaftActorContext {
     public DataPersistenceProvider getPersistenceProvider() {
         return persistenceProvider;
     }
     public DataPersistenceProvider getPersistenceProvider() {
         return persistenceProvider;
     }
+
+
+    @Override
+    public RaftPolicy getRaftPolicy() {
+        return DefaultRaftPolicy.INSTANCE;
+    }
 }
 }
index 94fa507b47aff41a6d4308ca687db8639c955559..134cde8720517ed447fb5e3275347667a4068c7f 100644 (file)
@@ -451,10 +451,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
                 logIndex)
         );
 
-        if (followerToLog.isEmpty()) {
+        boolean applyModificationToState = followerToLog.isEmpty()
+                || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
+
+        if(applyModificationToState){
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
-        } else {
+        }
+
+        if (!followerToLog.isEmpty()) {
             sendAppendEntries(0, false);
         }
     }
             sendAppendEntries(0, false);
         }
     }
index d16ddabddee988df72c90866a05486312ecc48d9..ede0f6af2c9cb11ce005aa76a9a047b97ef5d29c 100644 (file)
@@ -48,10 +48,12 @@ public class Follower extends AbstractRaftActorBehavior {
 
         initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
 
 
         initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
 
-        if(context.getPeerAddresses().isEmpty()){
-            actor().tell(ELECTION_TIMEOUT, actor());
-        } else {
-            scheduleElection(electionDuration());
+        if(context.getRaftPolicy().automaticElectionsEnabled()) {
+            if (context.getPeerAddresses().isEmpty()) {
+                actor().tell(ELECTION_TIMEOUT, actor());
+            } else {
+                scheduleElection(electionDuration());
+            }
         }
 
     }
         }
 
     }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/DefaultRaftPolicy.java
new file mode 100644 (file)
index 0000000..8e71045
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+public class DefaultRaftPolicy implements RaftPolicy {
+
+    public static final RaftPolicy INSTANCE = new DefaultRaftPolicy();
+
+    @Override
+    public boolean automaticElectionsEnabled() {
+        return true;
+    }
+
+    @Override
+    public boolean applyModificationToStateBeforeConsensus() {
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/policy/RaftPolicy.java
new file mode 100644 (file)
index 0000000..a66caa4
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+/**
+ * The RaftPolicy is intended to change the default behavior of Raft. For example
+ * we may want to be able to determine which Raft replica should become the leader - with Raft elections are
+ * randomized so it is not possible to specify which replica should be the leader. The ability to specify
+ * the leader would be quite useful when testing a raft cluster.
+ *
+ * Similarly we may want to customize when exactly we apply a modification to the state - with Raft a modification
+ * is only applied to the state when the modification is replicated to a majority of the replicas. The ability to
+ * apply a modification to the state before consensus would be useful in scenarios where you have only 2 nodes
+ * in a Raft cluster and one of them is down but you still want the RaftActor to apply a modification to the state.
+ *
+ */
+public interface RaftPolicy {
+    /**
+     * According to Raft a Follower which does not receive a heartbeat (aka AppendEntries) in a given period should
+     * become a Candidate and trigger an election.
+     *
+     * @return true to enable automatic Raft elections, false to disable them
+     */
+    boolean automaticElectionsEnabled();
+
+    /**
+     * According to Raft consensus on a Raft entry is achieved only after a Leader replicates a log entry to a
+     * majority of it's followers
+     *
+     * @return true if modification should be applied before consensus, false to apply modification to state
+     * as per Raft
+     */
+    boolean applyModificationToStateBeforeConsensus();
+}
index 31a12d5659dcef801dce2bfa14357d9f53e58838..1938dab98e0ae3eb9deb43a85748e9d73714792a 100644 (file)
@@ -21,6 +21,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
@@ -42,6 +44,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private SnapshotManager snapshotManager;
     private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
     private short payloadVersion;
     private SnapshotManager snapshotManager;
     private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
     private short payloadVersion;
+    private RaftPolicy raftPolicy = DefaultRaftPolicy.INSTANCE;
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -238,6 +241,15 @@ public class MockRaftActorContext implements RaftActorContext {
         return payloadVersion;
     }
 
         return payloadVersion;
     }
 
+    @Override
+    public RaftPolicy getRaftPolicy() {
+        return this.raftPolicy;
+    }
+
+    public void setRaftPolicy(RaftPolicy raftPolicy){
+        this.raftPolicy = raftPolicy;
+    }
+
     public void setPayloadVersion(short payloadVersion) {
         this.payloadVersion = payloadVersion;
     }
     public void setPayloadVersion(short payloadVersion) {
         this.payloadVersion = payloadVersion;
     }
index d3f5a0eead1ff3ccdd2e69f215eccfb6886143b4..14205aba5cd5364614e4bbe6bb17cb274433ca69 100644 (file)
@@ -15,6 +15,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -349,4 +350,19 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     protected void logStart(String name) {
         LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
     }
     protected void logStart(String name) {
         LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
     }
+
+    protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
+                                          final boolean applyModificationToStateBeforeConsensus){
+        return new RaftPolicy() {
+            @Override
+            public boolean automaticElectionsEnabled() {
+                return automaticElectionsEnabled;
+            }
+
+            @Override
+            public boolean applyModificationToStateBeforeConsensus() {
+                return applyModificationToStateBeforeConsensus;
+            }
+        };
+    }
 }
 }
index ce1b46e9fd0d400e8467f091fd4621eb3548bbfc..1e931109c3e9234e73cf3c94f31b1a3e7ecf3978 100644 (file)
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -37,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
@@ -782,6 +784,24 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
         assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
     }
 
+    @Test
+    public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+        MockRaftActorContext context = createActorContext();
+        context.setConfigParams(new DefaultConfigParamsImpl(){
+            @Override
+            public FiniteDuration getElectionTimeOutInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        context.setRaftPolicy(createRaftPolicy(false, false));
+
+        follower = createBehavior(context);
+
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
+    }
+
+
     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
         int snapshotLength = bs.size();
         int start = offset;
     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
         int snapshotLength = bs.size();
         int start = offset;
index 89b83b3b369e8e5ed70349e2a1c8fcd223d7c9b4..ee6e5776e15161c693ebd8aab83a6895f501353f 100644 (file)
@@ -170,6 +170,45 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
         assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+        assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+    }
+
+    @Test
+    public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+        logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+        assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
     }
 
     @Test
     }
 
     @Test
index 6ea7a20924d4480b0497c02e715eefddee81556c..b0773bf7ca1ddbe92b6749ca7ec94d7b5e3bb9e1 100644 (file)
@@ -108,6 +108,7 @@ public class MessageCollectorActor extends UntypedActor {
         return expectFirstMatching(actor, clazz, 5000);
     }
 
         return expectFirstMatching(actor, clazz, 5000);
     }
 
+
     public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
         int count = (int) (timeout / 50);
         for(int i = 0; i < count; i++) {
     public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
         int count = (int) (timeout / 50);
         for(int i = 0; i < count; i++) {
@@ -147,6 +148,28 @@ public class MessageCollectorActor extends UntypedActor {
         return null;
     }
 
         return null;
     }
 
+    public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz) {
+        assertNoneMatching(actor, clazz, 5000);
+    }
+
+    public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz, long timeout) {
+        int count = (int) (timeout / 50);
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = getFirstMatching(actor, clazz);
+                if(message != null) {
+                    Assert.fail("Unexpected message received" +  message.toString());
+                    return;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        return;
+    }
+
+
     public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
         List<Object> allMessages = getAllMessages(actor);
 
     public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
         List<Object> allMessages = getAllMessages(actor);