Bug 5504: Add PreLeader raft state 28/42728/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 27 Jul 2016 19:52:53 +0000 (15:52 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 15:10:14 +0000 (15:10 +0000)
The following scenario can result in a "store tree and candidate base
differ" IllegalStateException on commit:

A follower receives a replicate and adds it to the log, say at index 1,
but the leader transfers or dies before committing and applying it to the
state. The follower becomes leader and when the next tx is applied, log
index 2, it has to first apply all log entries from the previous term that
hadn't been committed yet, in this case index 1. Since we got consensus for
index 2 that means index 1 has also been replicated to a majority. Therefore
ApplyState is sent for index 1 and then index 2. However index 1 is applied
as a "foreign" candidate while index 2 is in the pre-commit state. When
index 2 is applied the commit fails.

To prevent this scenario, we introduce a new raft state, PreLeader,
which is transitioned to from Candidate if there are uncommitted
entries, ie commit index < last log index. The PreLeader state performs all
the duties of Leader with the added behavior of attempting to commit all
uncommitted entries from the previous leader's term. Raft does not allow a
leader to commit entries from a previous term by simply counting replicas -
only entries from the leader's current term can be committed (ยง5.4.2). Rather
then waiting for a client interaction to commit a new entry, the PreLeader
state immediately appends a no-op entry (NoopPayload) to the log with the
leader's current term. Once the no-op entry is committed, all prior entries
are committed indirectly. Once all entries are committed, ie commitIndex matches
the last log index, it switches to the normal Leader state.

The PreLeader state is considered an inactive leader state and thus
client transactions are delayed until it transitions to Leader.

Change-Id: I20a541de0eba9b0075b9952dc6d5808943b7bb8f
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 3037059c3d4803e5ae9bd946edc75208eb4b380d..90f23ddcfc1625f33998f5339572ee426522de91 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
@@ -235,8 +236,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     applyState.getReplicatedLogEntry().getData());
             }
 
                     applyState.getReplicatedLogEntry().getData());
             }
 
-            applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                applyState.getReplicatedLogEntry().getData());
+            if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
+                applyState(applyState.getClientActor(), applyState.getIdentifier(),
+                    applyState.getReplicatedLogEntry().getData());
+            }
 
             long elapsedTime = System.nanoTime() - startTime;
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
 
             long elapsedTime = System.nanoTime() - startTime;
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
@@ -279,6 +282,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             onShutDown();
         } else if(message instanceof Runnable) {
             ((Runnable)message).run();
             onShutDown();
         } else if(message instanceof Runnable) {
             ((Runnable)message).run();
+        } else if(message instanceof NoopPayload) {
+            persistData(null, null, (NoopPayload)message);
         } else {
             // Processing the message may affect the state, hence we need to capture it
             final RaftActorBehavior currentBehavior = getCurrentBehavior();
         } else {
             // Processing the message may affect the state, hence we need to capture it
             final RaftActorBehavior currentBehavior = getCurrentBehavior();
@@ -536,7 +541,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // the state to durable storage
                 self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
                 // the state to durable storage
                 self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
-            } else if (clientActor != null) {
+            } else {
                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
                 // Send message for replication
                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
                 // Send message for replication
@@ -574,8 +579,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected final boolean isLeaderActive() {
     }
 
     protected final boolean isLeaderActive() {
-        return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
-                !isLeadershipTransferInProgress();
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader &&
+                !shuttingDown && !isLeadershipTransferInProgress();
     }
 
     private boolean isLeadershipTransferInProgress() {
     }
 
     private boolean isLeadershipTransferInProgress() {
index 32c0c18423102a419294070d3b58732a0c22ce4e..5c5b947404a6bc2e81f4ffed03180e42be36e6e0 100644 (file)
@@ -12,5 +12,6 @@ public enum RaftState {
     Candidate,
     Follower,
     Leader,
     Candidate,
     Follower,
     Leader,
-    IsolatedLeader;
+    IsolatedLeader,
+    PreLeader;
 }
 }
index f39351bcf20b5370e9f0ea256145295140b11189..94e08658e8c06d443921e80db090b939911ef6f2 100644 (file)
@@ -99,12 +99,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private Optional<SnapshotHolder> snapshot;
     private int minReplicationCount;
 
     private Optional<SnapshotHolder> snapshot;
     private int minReplicationCount;
 
-    protected AbstractLeader(RaftActorContext context, RaftState state) {
+    protected AbstractLeader(RaftActorContext context, RaftState state,
+            @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
         super(context, state);
 
-        for(PeerInfo peerInfo: context.getPeers()) {
-            FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
-            followerToLog.put(peerInfo.getId(), followerLogInformation);
+        if(initializeFromLeader != null) {
+            followerToLog.putAll(initializeFromLeader.followerToLog);
+        } else {
+            for(PeerInfo peerInfo: context.getPeers()) {
+                FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+                followerToLog.put(peerInfo.getId(), followerLogInformation);
+            }
         }
 
         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
         }
 
         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
@@ -123,6 +128,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
     }
 
         scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
     }
 
+    protected AbstractLeader(RaftActorContext context, RaftState state) {
+        this(context, state, null);
+    }
+
     /**
      * Return an immutable collection of follower identifiers.
      *
     /**
      * Return an immutable collection of follower identifiers.
      *
@@ -529,16 +538,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
-                replicate.getIdentifier(), logIndex);
+        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        trackers.add(
-            new ClientRequestTrackerImpl(replicate.getClientActor(),
-                replicate.getIdentifier(),
-                logIndex)
-        );
+        if(replicate.getClientActor() != null) {
+            trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+                    logIndex));
+        }
 
         boolean applyModificationToState = !context.anyVotingPeers()
                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
 
         boolean applyModificationToState = !context.anyVotingPeers()
                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
index 59ba1a9d869d381318b7ba59c18165888367f1f9..943c4f97d5769f7fbc0835ec9999ab057b1e7577 100644 (file)
@@ -78,6 +78,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 return new IsolatedLeader(context);
             case Leader:
                 return new Leader(context);
                 return new IsolatedLeader(context);
             case Leader:
                 return new Leader(context);
+            case PreLeader:
+                return new PreLeader(context);
             default:
                 throw new IllegalArgumentException("Unhandled state " + state);
         }
             default:
                 throw new IllegalArgumentException("Unhandled state " + state);
         }
@@ -428,7 +430,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return this;
     }
 
         return this;
     }
 
-    private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+    protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
         try {
             close();
         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
         try {
             close();
index 1205c4bad66ed26e4a305d45612b2881464de533..90120e9ac36dae88b4b8a9cb39a1f251ba0b7070 100644 (file)
@@ -115,7 +115,13 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (voteCount >= votesRequired) {
         }
 
         if (voteCount >= votesRequired) {
-            return internalSwitchBehavior(RaftState.Leader);
+            if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) {
+                LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(),
+                        context.getReplicatedLog().lastIndex());
+                return internalSwitchBehavior(RaftState.PreLeader);
+            } else {
+                return internalSwitchBehavior(RaftState.Leader);
+            }
         }
 
         return this;
         }
 
         return this;
index e9b45a3e5f373ac02ebd5e1c292a61a8ceb91f4e..58cf7161062c1eb67568930fb15252aadd21a23e 100644 (file)
@@ -55,8 +55,12 @@ public class Leader extends AbstractLeader {
     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
     private @Nullable LeadershipTransferContext leadershipTransferContext;
 
     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
     private @Nullable LeadershipTransferContext leadershipTransferContext;
 
+    Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
+        super(context, RaftState.Leader, initializeFromLeader);
+    }
+
     public Leader(RaftActorContext context) {
     public Leader(RaftActorContext context) {
-        super(context, RaftState.Leader);
+        this(context, null);
     }
 
     @Override
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java
new file mode 100644 (file)
index 0000000..e3ae4f4
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+
+/**
+ * The behavior of a RaftActor when it is in the PreLeader state. This state performs all the duties of
+ * Leader with the added behavior of attempting to commit all uncommitted entries from the previous leader's
+ * term. Raft does not allow a leader to commit entries from a previous term by simply counting replicas -
+ * only entries from the leader's current term can be committed (ยง5.4.2). Rather then waiting for a client
+ * interaction to commit a new entry, the PreLeader state immediately appends a no-op entry (NoopPayload) to
+ * the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed
+ * indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the
+ * normal Leader state.
+ * <p>
+ * The use of a no-op entry in this manner is outlined in the last paragraph in ยง8 of the
+ * <a href="https://raft.github.io/raft.pdf">extended raft version</a>.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLeader extends AbstractLeader {
+
+    public PreLeader(RaftActorContext context) {
+        super(context, RaftState.PreLeader);
+
+        context.getActor().tell(NoopPayload.INSTANCE, context.getActor());
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+        RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
+
+        if(context.getCommitIndex() >= context.getReplicatedLog().lastIndex()) {
+            // We've committed all entries - we can switch to Leader.
+            returnBehavior = internalSwitchBehavior(new Leader(context, this));
+        }
+
+        return returnBehavior;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java
new file mode 100644 (file)
index 0000000..2393a30
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Payload used for no-op log entries that are put into the journal by the PreLeader in order to commit
+ * entries from the prior term.
+ *
+ * @author Thomas Pantelis
+ */
+public final class NoopPayload extends Payload implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final NoopPayload INSTANCE = new NoopPayload();
+
+    private NoopPayload() {
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    private Object readResolve() {
+        return INSTANCE;
+    }
+}
index 1a2ed4bb158fbe8614a991e2ebcdcc942756e971..2a39c29d537883440b4e4a4754f8d780e5b4246e 100644 (file)
@@ -84,11 +84,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             this.collectorActor = builder.collectorActor;
         }
 
             this.collectorActor = builder.collectorActor;
         }
 
-        void startDropMessages(Class<?> msgClass) {
+        public void startDropMessages(Class<?> msgClass) {
             dropMessages.put(msgClass, Boolean.TRUE);
         }
 
             dropMessages.put(msgClass, Boolean.TRUE);
         }
 
-        void stopDropMessages(Class<?> msgClass) {
+        public void stopDropMessages(Class<?> msgClass) {
             dropMessages.remove(msgClass);
         }
 
             dropMessages.remove(msgClass);
         }
 
index 5a4b43721db99d1d2a548afceb00bcc47b7f37d8..a41c5013dea526b8b3d478f31a0c6f06793ba2df 100644 (file)
@@ -81,6 +81,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
         replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
         setReplicatedLog(replicatedLog);
         replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
         replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
         setReplicatedLog(replicatedLog);
+        setCommitIndex(replicatedLog.lastIndex());
     }
 
     @Override public ActorRef actorOf(Props props) {
     }
 
     @Override public ActorRef actorOf(Props props) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java
new file mode 100644 (file)
index 0000000..d451ea1
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests PreLeader raft state functionality end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest {
+
+    private TestActorRef<Actor> follower1NotifierActor;
+    private DefaultConfigParamsImpl followerConfigParams;
+
+    @Test
+    public void testUnComittedEntryOnLeaderChange() throws Exception {
+        testLog.info("testUnComittedEntryOnLeaderChange starting");
+
+        createRaftActors();
+
+        // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry.
+        leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        // Send a payload and verify AppendEntries is received in follower1.
+        MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+
+        AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size());
+        verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0);
+
+        // Kill the leader actor.
+        killActor(leaderActor);
+
+        // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not
+        // received the payload entry yet.
+        assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex());
+        assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied());
+
+        assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size());
+
+        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        clearMessages(follower1NotifierActor);
+
+        // Force follower1 to start an election. It should win since it's journal is more up-to-date than
+        // follower2's journal.
+        follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+        // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
+        List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
+        assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
+        assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
+        assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
+
+        long previousTerm = currentTerm;
+        currentTerm = follower1Context.getTermInformation().getCurrentTerm();
+
+        // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
+        // new term to follower2 and committed both entries, including the first payload from the previous term.
+        assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        // Both entries should be applied to the state.
+        expectMatching(follower1CollectorActor, ApplyState.class, 2);
+        expectMatching(follower2CollectorActor, ApplyState.class, 2);
+
+        assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
+
+        // Verify follower2's journal matches follower1's.
+        assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size());
+        assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex());
+        assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied());
+        verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        // Reinstate follower1.
+        killActor(follower1Actor);
+
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                config(followerConfigParams));
+        follower1Actor.underlyingActor().waitForRecoveryComplete();
+        follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+
+        // Verify follower1's journal was persisted and recovered correctly.
+        assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
+        assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        testLog.info("testUnComittedEntryOnLeaderChange ending");
+    }
+
+    private void createRaftActors() {
+        testLog.info("createRaftActors starting");
+
+        follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower1Id + "-notifier"));
+
+        followerConfigParams = newFollowerConfigParams();
+        followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).
+                put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+        waitUntilLeader(leaderActor);
+
+        expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
+
+        clearMessages(leaderCollectorActor);
+        clearMessages(follower1CollectorActor);
+        clearMessages(follower2CollectorActor);
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+        follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+
+        testLog.info("createRaftActors ending");
+    }
+}
index 6e431f28ad8957362987f02a80ff3762feaee5da..47d39dcc6111d1f4222bd736ed57ff533b30e471 100644 (file)
@@ -1299,6 +1299,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
                 new MockRaftActorContext.MockPayload("2")));
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
                 new MockRaftActorContext.MockPayload("2")));
+        InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
index 40080a8c72ba7978d365057fcd875897307048b4..c6773d83c2c94714a67fe01a84cb1e7c89eb242a 100644 (file)
@@ -126,6 +126,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         raftActorContext.getTermInformation().update(2L, "other");
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
                 createEntries(0, 5, 1).build());
         raftActorContext.getTermInformation().update(2L, "other");
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
                 createEntries(0, 5, 1).build());
+        raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex());
         raftActorContext.setPeerAddresses(setupPeers(4));
         candidate = new Candidate(raftActorContext);
 
         raftActorContext.setPeerAddresses(setupPeers(4));
         candidate = new Candidate(raftActorContext);
 
index b11888f76f1784b03d92321136ef0150f6d17454..b25ba2fde2190d027649db350c7300708adb3158 100644 (file)
@@ -893,6 +893,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.setCommitIndex(-1);
 
         follower = createBehavior(context);
 
 
         follower = createBehavior(context);
 
index 461dfe506db852afa47a7863cdc93f45e677840c..292562c92f8b9dc97ed2fd32734e9b18c357309c 100644 (file)
@@ -106,6 +106,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
         short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
         short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
@@ -201,6 +202,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
@@ -1987,6 +1989,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 new FiniteDuration(1000, TimeUnit.SECONDS));
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
                 new FiniteDuration(1000, TimeUnit.SECONDS));
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
index 6c151f8c76e64a1fbf0b27a3894dfa6b3876d474..46743482928f92bc1e44f28c0df8c27923d8f721 100644 (file)
@@ -260,6 +260,7 @@ class EntityOwnershipShard extends Shard {
             case Candidate:
             case Follower:
             case Leader:
             case Candidate:
             case Follower:
             case Leader:
+            case PreLeader:
                 return false;
             case IsolatedLeader:
                 return true;
                 return false;
             case IsolatedLeader:
                 return true;
index 262eb6d246493d697870e1e61676e714d5b1cc6b..1ee161df01c9c6b8443a8c564fcf81e8bdced702 100644 (file)
@@ -148,7 +148,8 @@ final class ShardInformation {
 
     boolean isShardReadyWithLeaderId() {
         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
 
     boolean isShardReadyWithLeaderId() {
         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                (isLeader() || addressResolver.resolve(leaderId) != null);
+                !RaftState.PreLeader.name().equals(role) &&
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
     }
 
     boolean isShardInitialized() {
     }
 
     boolean isShardInitialized() {
index 0a82a1fffb43299d5b5aa43de5c2011b067d253b..e7f56baa5f38a8283c086973ed845dcfa3030658 100644 (file)
@@ -712,6 +712,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             }
         });
 
             }
         });
 
+        MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("getLastApplied", 0, raftState.getLastApplied());
+            }
+        });
+
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.