Bug 5504: Add PreLeader raft state 28/42728/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 27 Jul 2016 19:52:53 +0000 (15:52 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 5 Aug 2016 15:10:14 +0000 (15:10 +0000)
The following scenario can result in a "store tree and candidate base
differ" IllegalStateException on commit:

A follower receives a replicate and adds it to the log, say at index 1,
but the leader transfers or dies before committing and applying it to the
state. The follower becomes leader and when the next tx is applied, log
index 2, it has to first apply all log entries from the previous term that
hadn't been committed yet, in this case index 1. Since we got consensus for
index 2 that means index 1 has also been replicated to a majority. Therefore
ApplyState is sent for index 1 and then index 2. However index 1 is applied
as a "foreign" candidate while index 2 is in the pre-commit state. When
index 2 is applied the commit fails.

To prevent this scenario, we introduce a new raft state, PreLeader,
which is transitioned to from Candidate if there are uncommitted
entries, ie commit index < last log index. The PreLeader state performs all
the duties of Leader with the added behavior of attempting to commit all
uncommitted entries from the previous leader's term. Raft does not allow a
leader to commit entries from a previous term by simply counting replicas -
only entries from the leader's current term can be committed (§5.4.2). Rather
then waiting for a client interaction to commit a new entry, the PreLeader
state immediately appends a no-op entry (NoopPayload) to the log with the
leader's current term. Once the no-op entry is committed, all prior entries
are committed indirectly. Once all entries are committed, ie commitIndex matches
the last log index, it switches to the normal Leader state.

The PreLeader state is considered an inactive leader state and thus
client transactions are delayed until it transitions to Leader.

Change-Id: I20a541de0eba9b0075b9952dc6d5808943b7bb8f
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index 3037059..90f23dd 100644 (file)
@@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
@@ -235,8 +236,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     applyState.getReplicatedLogEntry().getData());
             }
 
-            applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                applyState.getReplicatedLogEntry().getData());
+            if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) {
+                applyState(applyState.getClientActor(), applyState.getIdentifier(),
+                    applyState.getReplicatedLogEntry().getData());
+            }
 
             long elapsedTime = System.nanoTime() - startTime;
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
@@ -279,6 +282,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             onShutDown();
         } else if(message instanceof Runnable) {
             ((Runnable)message).run();
+        } else if(message instanceof NoopPayload) {
+            persistData(null, null, (NoopPayload)message);
         } else {
             // Processing the message may affect the state, hence we need to capture it
             final RaftActorBehavior currentBehavior = getCurrentBehavior();
@@ -536,7 +541,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // the state to durable storage
                 self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
 
-            } else if (clientActor != null) {
+            } else {
                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
 
                 // Send message for replication
@@ -574,8 +579,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected final boolean isLeaderActive() {
-        return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
-                !isLeadershipTransferInProgress();
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader &&
+                !shuttingDown && !isLeadershipTransferInProgress();
     }
 
     private boolean isLeadershipTransferInProgress() {
index f39351b..94e0865 100644 (file)
@@ -99,12 +99,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private Optional<SnapshotHolder> snapshot;
     private int minReplicationCount;
 
-    protected AbstractLeader(RaftActorContext context, RaftState state) {
+    protected AbstractLeader(RaftActorContext context, RaftState state,
+            @Nullable AbstractLeader initializeFromLeader) {
         super(context, state);
 
-        for(PeerInfo peerInfo: context.getPeers()) {
-            FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
-            followerToLog.put(peerInfo.getId(), followerLogInformation);
+        if(initializeFromLeader != null) {
+            followerToLog.putAll(initializeFromLeader.followerToLog);
+        } else {
+            for(PeerInfo peerInfo: context.getPeers()) {
+                FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+                followerToLog.put(peerInfo.getId(), followerLogInformation);
+            }
         }
 
         LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
@@ -123,6 +128,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
     }
 
+    protected AbstractLeader(RaftActorContext context, RaftState state) {
+        this(context, state, null);
+    }
+
     /**
      * Return an immutable collection of follower identifiers.
      *
@@ -529,16 +538,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
-                replicate.getIdentifier(), logIndex);
+        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
-        trackers.add(
-            new ClientRequestTrackerImpl(replicate.getClientActor(),
-                replicate.getIdentifier(),
-                logIndex)
-        );
+        if(replicate.getClientActor() != null) {
+            trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+                    logIndex));
+        }
 
         boolean applyModificationToState = !context.anyVotingPeers()
                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
index 59ba1a9..943c4f9 100644 (file)
@@ -78,6 +78,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 return new IsolatedLeader(context);
             case Leader:
                 return new Leader(context);
+            case PreLeader:
+                return new PreLeader(context);
             default:
                 throw new IllegalArgumentException("Unhandled state " + state);
         }
@@ -428,7 +430,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return this;
     }
 
-    private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+    protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
         try {
             close();
index 1205c4b..90120e9 100644 (file)
@@ -115,7 +115,13 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (voteCount >= votesRequired) {
-            return internalSwitchBehavior(RaftState.Leader);
+            if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) {
+                LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(),
+                        context.getReplicatedLog().lastIndex());
+                return internalSwitchBehavior(RaftState.PreLeader);
+            } else {
+                return internalSwitchBehavior(RaftState.Leader);
+            }
         }
 
         return this;
index e9b45a3..58cf716 100644 (file)
@@ -55,8 +55,12 @@ public class Leader extends AbstractLeader {
     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
     private @Nullable LeadershipTransferContext leadershipTransferContext;
 
+    Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
+        super(context, RaftState.Leader, initializeFromLeader);
+    }
+
     public Leader(RaftActorContext context) {
-        super(context, RaftState.Leader);
+        this(context, null);
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java
new file mode 100644 (file)
index 0000000..e3ae4f4
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+
+/**
+ * The behavior of a RaftActor when it is in the PreLeader state. This state performs all the duties of
+ * Leader with the added behavior of attempting to commit all uncommitted entries from the previous leader's
+ * term. Raft does not allow a leader to commit entries from a previous term by simply counting replicas -
+ * only entries from the leader's current term can be committed (§5.4.2). Rather then waiting for a client
+ * interaction to commit a new entry, the PreLeader state immediately appends a no-op entry (NoopPayload) to
+ * the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed
+ * indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the
+ * normal Leader state.
+ * <p>
+ * The use of a no-op entry in this manner is outlined in the last paragraph in §8 of the
+ * <a href="https://raft.github.io/raft.pdf">extended raft version</a>.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLeader extends AbstractLeader {
+
+    public PreLeader(RaftActorContext context) {
+        super(context, RaftState.PreLeader);
+
+        context.getActor().tell(NoopPayload.INSTANCE, context.getActor());
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+        RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
+
+        if(context.getCommitIndex() >= context.getReplicatedLog().lastIndex()) {
+            // We've committed all entries - we can switch to Leader.
+            returnBehavior = internalSwitchBehavior(new Leader(context, this));
+        }
+
+        return returnBehavior;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java
new file mode 100644 (file)
index 0000000..2393a30
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.persisted;
+
+import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Payload used for no-op log entries that are put into the journal by the PreLeader in order to commit
+ * entries from the prior term.
+ *
+ * @author Thomas Pantelis
+ */
+public final class NoopPayload extends Payload implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final NoopPayload INSTANCE = new NoopPayload();
+
+    private NoopPayload() {
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    private Object readResolve() {
+        return INSTANCE;
+    }
+}
index 1a2ed4b..2a39c29 100644 (file)
@@ -84,11 +84,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             this.collectorActor = builder.collectorActor;
         }
 
-        void startDropMessages(Class<?> msgClass) {
+        public void startDropMessages(Class<?> msgClass) {
             dropMessages.put(msgClass, Boolean.TRUE);
         }
 
-        void stopDropMessages(Class<?> msgClass) {
+        public void stopDropMessages(Class<?> msgClass) {
             dropMessages.remove(msgClass);
         }
 
index 5a4b437..a41c501 100644 (file)
@@ -81,6 +81,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
         replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
         setReplicatedLog(replicatedLog);
+        setCommitIndex(replicatedLog.lastIndex());
     }
 
     @Override public ActorRef actorOf(Props props) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java
new file mode 100644 (file)
index 0000000..d451ea1
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests PreLeader raft state functionality end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest {
+
+    private TestActorRef<Actor> follower1NotifierActor;
+    private DefaultConfigParamsImpl followerConfigParams;
+
+    @Test
+    public void testUnComittedEntryOnLeaderChange() throws Exception {
+        testLog.info("testUnComittedEntryOnLeaderChange starting");
+
+        createRaftActors();
+
+        // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry.
+        leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        // Send a payload and verify AppendEntries is received in follower1.
+        MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+
+        AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size());
+        verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0);
+
+        // Kill the leader actor.
+        killActor(leaderActor);
+
+        // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not
+        // received the payload entry yet.
+        assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex());
+        assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied());
+
+        assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size());
+
+        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        clearMessages(follower1NotifierActor);
+
+        // Force follower1 to start an election. It should win since it's journal is more up-to-date than
+        // follower2's journal.
+        follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+        // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
+        List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
+        assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
+        assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
+        assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
+
+        long previousTerm = currentTerm;
+        currentTerm = follower1Context.getTermInformation().getCurrentTerm();
+
+        // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the
+        // new term to follower2 and committed both entries, including the first payload from the previous term.
+        assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        // Both entries should be applied to the state.
+        expectMatching(follower1CollectorActor, ApplyState.class, 2);
+        expectMatching(follower2CollectorActor, ApplyState.class, 2);
+
+        assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
+
+        // Verify follower2's journal matches follower1's.
+        assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size());
+        assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex());
+        assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied());
+        verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        // Reinstate follower1.
+        killActor(follower1Actor);
+
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                config(followerConfigParams));
+        follower1Actor.underlyingActor().waitForRecoveryComplete();
+        follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+
+        // Verify follower1's journal was persisted and recovered correctly.
+        assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size());
+        assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex());
+        assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0);
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE);
+
+        testLog.info("testUnComittedEntryOnLeaderChange ending");
+    }
+
+    private void createRaftActors() {
+        testLog.info("createRaftActors starting");
+
+        follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower1Id + "-notifier"));
+
+        followerConfigParams = newFollowerConfigParams();
+        followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).
+                put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+        waitUntilLeader(leaderActor);
+
+        expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
+
+        clearMessages(leaderCollectorActor);
+        clearMessages(follower1CollectorActor);
+        clearMessages(follower2CollectorActor);
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+        follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+
+        testLog.info("createRaftActors ending");
+    }
+}
index 6e431f2..47d39dc 100644 (file)
@@ -1299,6 +1299,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
         InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
                 new MockRaftActorContext.MockPayload("2")));
+        InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1));
 
         TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
index 40080a8..c6773d8 100644 (file)
@@ -126,6 +126,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
         raftActorContext.getTermInformation().update(2L, "other");
         raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().
                 createEntries(0, 5, 1).build());
+        raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex());
         raftActorContext.setPeerAddresses(setupPeers(4));
         candidate = new Candidate(raftActorContext);
 
index b11888f..b25ba2f 100644 (file)
@@ -893,6 +893,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.setCommitIndex(-1);
 
         follower = createBehavior(context);
 
index 461dfe5..292562c 100644 (file)
@@ -106,6 +106,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
         short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
@@ -201,6 +202,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
@@ -1987,6 +1989,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 new FiniteDuration(1000, TimeUnit.SECONDS));
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
index 6c151f8..4674348 100644 (file)
@@ -260,6 +260,7 @@ class EntityOwnershipShard extends Shard {
             case Candidate:
             case Follower:
             case Leader:
+            case PreLeader:
                 return false;
             case IsolatedLeader:
                 return true;
index 262eb6d..1ee161d 100644 (file)
@@ -148,7 +148,8 @@ final class ShardInformation {
 
     boolean isShardReadyWithLeaderId() {
         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                (isLeader() || addressResolver.resolve(leaderId) != null);
+                !RaftState.PreLeader.name().equals(role) &&
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
     }
 
     boolean isShardInitialized() {
index 0a82a1f..e7f56ba 100644 (file)
@@ -712,6 +712,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             }
         });
 
+        MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("getLastApplied", 0, raftState.getLastApplied());
+            }
+        });
+
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.