From: Tom Pantelis Date: Wed, 27 Jul 2016 19:52:53 +0000 (-0400) Subject: Bug 5504: Add PreLeader raft state X-Git-Tag: release/boron~20 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d86f990976dcc2879b40dec7df1b3b5fba8cba78 Bug 5504: Add PreLeader raft state The following scenario can result in a "store tree and candidate base differ" IllegalStateException on commit: A follower receives a replicate and adds it to the log, say at index 1, but the leader transfers or dies before committing and applying it to the state. The follower becomes leader and when the next tx is applied, log index 2, it has to first apply all log entries from the previous term that hadn't been committed yet, in this case index 1. Since we got consensus for index 2 that means index 1 has also been replicated to a majority. Therefore ApplyState is sent for index 1 and then index 2. However index 1 is applied as a "foreign" candidate while index 2 is in the pre-commit state. When index 2 is applied the commit fails. To prevent this scenario, we introduce a new raft state, PreLeader, which is transitioned to from Candidate if there are uncommitted entries, ie commit index < last log index. The PreLeader state performs all the duties of Leader with the added behavior of attempting to commit all uncommitted entries from the previous leader's term. Raft does not allow a leader to commit entries from a previous term by simply counting replicas - only entries from the leader's current term can be committed (§5.4.2). Rather then waiting for a client interaction to commit a new entry, the PreLeader state immediately appends a no-op entry (NoopPayload) to the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the normal Leader state. The PreLeader state is considered an inactive leader state and thus client transactions are delayed until it transitions to Leader. Change-Id: I20a541de0eba9b0075b9952dc6d5808943b7bb8f Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 3037059c3d..90f23ddcfc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -50,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -235,8 +236,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); + if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), + applyState.getReplicatedLogEntry().getData()); + } long elapsedTime = System.nanoTime() - startTime; if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ @@ -279,6 +282,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onShutDown(); } else if(message instanceof Runnable) { ((Runnable)message).run(); + } else if(message instanceof NoopPayload) { + persistData(null, null, (NoopPayload)message); } else { // Processing the message may affect the state, hence we need to capture it final RaftActorBehavior currentBehavior = getCurrentBehavior(); @@ -536,7 +541,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // the state to durable storage self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { + } else { context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); // Send message for replication @@ -574,8 +579,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected final boolean isLeaderActive() { - return getRaftState() != RaftState.IsolatedLeader && !shuttingDown && - !isLeadershipTransferInProgress(); + return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader && + !shuttingDown && !isLeadershipTransferInProgress(); } private boolean isLeadershipTransferInProgress() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java index 32c0c18423..5c5b947404 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java @@ -12,5 +12,6 @@ public enum RaftState { Candidate, Follower, Leader, - IsolatedLeader; + IsolatedLeader, + PreLeader; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index f39351bcf2..94e08658e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -99,12 +99,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; private int minReplicationCount; - protected AbstractLeader(RaftActorContext context, RaftState state) { + protected AbstractLeader(RaftActorContext context, RaftState state, + @Nullable AbstractLeader initializeFromLeader) { super(context, state); - for(PeerInfo peerInfo: context.getPeers()) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); - followerToLog.put(peerInfo.getId(), followerLogInformation); + if(initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + } else { + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } } LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); @@ -123,6 +128,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + protected AbstractLeader(RaftActorContext context, RaftState state) { + this(context, state, null); + } + /** * Return an immutable collection of follower identifiers. * @@ -529,16 +538,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), - replicate.getIdentifier(), logIndex); + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - trackers.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if(replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 59ba1a9d86..943c4f97d5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -78,6 +78,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return new IsolatedLeader(context); case Leader: return new Leader(context); + case PreLeader: + return new PreLeader(context); default: throw new IllegalArgumentException("Unhandled state " + state); } @@ -428,7 +430,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return this; } - private RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); try { close(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 1205c4bad6..90120e9ac3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -115,7 +115,13 @@ public class Candidate extends AbstractRaftActorBehavior { } if (voteCount >= votesRequired) { - return internalSwitchBehavior(RaftState.Leader); + if(context.getCommitIndex() < context.getReplicatedLog().lastIndex()) { + LOG.debug("{}: Connmit index {} is behind last index {}", logName(), context.getCommitIndex(), + context.getReplicatedLog().lastIndex()); + return internalSwitchBehavior(RaftState.PreLeader); + } else { + return internalSwitchBehavior(RaftState.Leader); + } } return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index e9b45a3e5f..58cf716106 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -55,8 +55,12 @@ public class Leader extends AbstractLeader { private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted(); private @Nullable LeadershipTransferContext leadershipTransferContext; + Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) { + super(context, RaftState.Leader, initializeFromLeader); + } + public Leader(RaftActorContext context) { - super(context, RaftState.Leader); + this(context, null); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java new file mode 100644 index 0000000000..e3ae4f427a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/PreLeader.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.behaviors; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; + +/** + * The behavior of a RaftActor when it is in the PreLeader state. This state performs all the duties of + * Leader with the added behavior of attempting to commit all uncommitted entries from the previous leader's + * term. Raft does not allow a leader to commit entries from a previous term by simply counting replicas - + * only entries from the leader's current term can be committed (§5.4.2). Rather then waiting for a client + * interaction to commit a new entry, the PreLeader state immediately appends a no-op entry (NoopPayload) to + * the log with the leader's current term. Once the no-op entry is committed, all prior entries are committed + * indirectly. Once all entries are committed, ie commitIndex matches the last log index, it switches to the + * normal Leader state. + *

+ * The use of a no-op entry in this manner is outlined in the last paragraph in §8 of the + * extended raft version. + * + * @author Thomas Pantelis + */ +public class PreLeader extends AbstractLeader { + + public PreLeader(RaftActorContext context) { + super(context, RaftState.PreLeader); + + context.getActor().tell(NoopPayload.INSTANCE, context.getActor()); + } + + @Override + protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply); + + if(context.getCommitIndex() >= context.getReplicatedLog().lastIndex()) { + // We've committed all entries - we can switch to Leader. + returnBehavior = internalSwitchBehavior(new Leader(context, this)); + } + + return returnBehavior; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java new file mode 100644 index 0000000000..2393a30610 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/NoopPayload.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.persisted; + +import java.io.Serializable; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; + +/** + * Payload used for no-op log entries that are put into the journal by the PreLeader in order to commit + * entries from the prior term. + * + * @author Thomas Pantelis + */ +public final class NoopPayload extends Payload implements Serializable { + private static final long serialVersionUID = 1L; + public static final NoopPayload INSTANCE = new NoopPayload(); + + private NoopPayload() { + } + + @Override + public int size() { + return 0; + } + + private Object readResolve() { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 1a2ed4bb15..2a39c29d53 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -84,11 +84,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest this.collectorActor = builder.collectorActor; } - void startDropMessages(Class msgClass) { + public void startDropMessages(Class msgClass) { dropMessages.put(msgClass, Boolean.TRUE); } - void stopDropMessages(Class msgClass) { + public void stopDropMessages(Class msgClass) { dropMessages.remove(msgClass); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 5a4b43721d..a41c5013de 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -81,6 +81,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1"))); replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2"))); setReplicatedLog(replicatedLog); + setCommitIndex(replicatedLog.lastIndex()); } @Override public ActorRef actorOf(Props props) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java new file mode 100644 index 0000000000..d451ea176d --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/PreLeaderScenarioTest.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching; +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; + +/** + * Tests PreLeader raft state functionality end-to-end. + * + * @author Thomas Pantelis + */ +public class PreLeaderScenarioTest extends AbstractRaftActorIntegrationTest { + + private TestActorRef follower1NotifierActor; + private DefaultConfigParamsImpl followerConfigParams; + + @Test + public void testUnComittedEntryOnLeaderChange() throws Exception { + testLog.info("testUnComittedEntryOnLeaderChange starting"); + + createRaftActors(); + + // Drop AppendEntriesReply to the leader so it doesn't commit the payload entry. + leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class); + follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + + // Send a payload and verify AppendEntries is received in follower1. + MockPayload payload0 = sendPayloadData(leaderActor, "zero"); + + AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class); + assertEquals("AppendEntries - # entries", 1, appendEntries.getEntries().size()); + verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 0, payload0); + + // Kill the leader actor. + killActor(leaderActor); + + // At this point, the payload entry is in follower1's log but is uncommitted. follower2 has not + // received the payload entry yet. + assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size()); + assertEquals("Follower 1 journal last index", 0, follower1Context.getReplicatedLog().lastIndex()); + assertEquals("Follower 1 commit index", -1, follower1Context.getCommitIndex()); + assertEquals("Follower 1 last applied index", -1, follower1Context.getLastApplied()); + + assertEquals("Follower 2 journal log size", 0, follower2Context.getReplicatedLog().size()); + + follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); + clearMessages(follower1NotifierActor); + + // Force follower1 to start an election. It should win since it's journal is more up-to-date than + // follower2's journal. + follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + + // Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry. + List roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3); + assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole()); + assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole()); + assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole()); + + long previousTerm = currentTerm; + currentTerm = follower1Context.getTermInformation().getCurrentTerm(); + + // Since it went to Leader, it should've appended and successfully replicated a NoopPaylod with the + // new term to follower2 and committed both entries, including the first payload from the previous term. + assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size()); + assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex()); + assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex()); + verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0); + verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE); + + // Both entries should be applied to the state. + expectMatching(follower1CollectorActor, ApplyState.class, 2); + expectMatching(follower2CollectorActor, ApplyState.class, 2); + + assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied()); + + // Verify follower2's journal matches follower1's. + assertEquals("Follower 2 journal log size", 2, follower2Context.getReplicatedLog().size()); + assertEquals("Follower 2 journal last index", 1, follower2Context.getReplicatedLog().lastIndex()); + assertEquals("Follower 2 commit index", 1, follower2Context.getCommitIndex()); + assertEquals("Follower 2 last applied index", 1, follower2Context.getLastApplied()); + verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(0), previousTerm, 0, payload0); + verifyReplicatedLogEntry(follower2Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE); + + // Reinstate follower1. + killActor(follower1Actor); + + follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses( + ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))). + config(followerConfigParams)); + follower1Actor.underlyingActor().waitForRecoveryComplete(); + follower1Context = follower1Actor.underlyingActor().getRaftActorContext(); + + // Verify follower1's journal was persisted and recovered correctly. + assertEquals("Follower 1 journal log size", 2, follower1Context.getReplicatedLog().size()); + assertEquals("Follower 1 journal last index", 1, follower1Context.getReplicatedLog().lastIndex()); + assertEquals("Follower 1 commit index", 1, follower1Context.getCommitIndex()); + assertEquals("Follower 1 last applied index", 1, follower1Context.getLastApplied()); + verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(0), previousTerm, 0, payload0); + verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(1), currentTerm, 1, NoopPayload.INSTANCE); + + testLog.info("testUnComittedEntryOnLeaderChange ending"); + } + + private void createRaftActors() { + testLog.info("createRaftActors starting"); + + follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + factory.generateActorId(follower1Id + "-notifier")); + + followerConfigParams = newFollowerConfigParams(); + followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + followerConfigParams.setSnapshotBatchCount(snapshotBatchCount); + follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses( + ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))). + config(followerConfigParams).roleChangeNotifier(follower1NotifierActor)); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), followerConfigParams); + + peerAddresses = ImmutableMap.builder(). + put(follower1Id, follower1Actor.path().toString()). + put(follower2Id, follower2Actor.path().toString()).build(); + + leaderConfigParams = newLeaderConfigParams(); + leaderConfigParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); + leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); + + leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + waitUntilLeader(leaderActor); + + expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2); + + clearMessages(leaderCollectorActor); + clearMessages(follower1CollectorActor); + clearMessages(follower2CollectorActor); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + currentTerm = leaderContext.getTermInformation().getCurrentTerm(); + + follower1Context = follower1Actor.underlyingActor().getRaftActorContext(); + follower2Context = follower2Actor.underlyingActor().getRaftActorContext(); + + testLog.info("createRaftActors ending"); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 6e431f28ad..47d39dcc61 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -1299,6 +1299,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry); InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("2"))); + InMemoryJournal.addEntry(node2ID, 4, new ApplyJournalEntries(1)); TestActorRef node1Collector = actorFactory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 40080a8c72..c6773d83c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -126,6 +126,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { raftActorContext.getTermInformation().update(2L, "other"); raftActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder(). createEntries(0, 5, 1).build()); + raftActorContext.setCommitIndex(raftActorContext.getReplicatedLog().lastIndex()); raftActorContext.setPeerAddresses(setupPeers(4)); candidate = new Candidate(raftActorContext); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index b11888f76f..b25ba2fde2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -893,6 +893,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testInitialSyncUpWithHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); + context.setCommitIndex(-1); follower = createBehavior(context); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 461dfe506d..292562c92f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -106,6 +106,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); short payloadVersion = (short)5; actorContext.setPayloadVersion(payloadVersion); @@ -201,6 +202,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); // The raft context is initialized with a couple log entries. However the commitIndex // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't @@ -1987,6 +1989,7 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(1000, TimeUnit.SECONDS)); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); String nonVotingFollowerId = "nonvoting-follower"; TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 6c151f8c76..4674348292 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -260,6 +260,7 @@ class EntityOwnershipShard extends Shard { case Candidate: case Follower: case Leader: + case PreLeader: return false; case IsolatedLeader: return true; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java index 262eb6d246..1ee161df01 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -148,7 +148,8 @@ final class ShardInformation { boolean isShardReadyWithLeaderId() { return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) && - (isLeader() || addressResolver.resolve(leaderId) != null); + !RaftState.PreLeader.name().equals(role) && + (isLeader() || addressResolver.resolve(leaderId) != null); } boolean isShardInitialized() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0a82a1fffb..e7f56baa5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -712,6 +712,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } }); + MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("getLastApplied", 0, raftState.getLastApplied()); + } + }); + // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in // the leader shard.