@Override
public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
- this.installSnapshotState = Preconditions.checkNotNull(state);
+ if(this.installSnapshotState == null) {
+ this.installSnapshotState = Preconditions.checkNotNull(state);
+ }
}
@Override
@Override
public String toString() {
- return "ApplyState{" +
- "identifier='" + identifier + '\'' +
- ", replicatedLogEntry.index =" + replicatedLogEntry.getIndex() +
- '}';
+ return "ApplyState [identifier=" + identifier + ", replicatedLogEntry=" + replicatedLogEntry + "]";
}
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
if(LOG.isTraceEnabled()) {
LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
}
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
if(followerLogInformation == null){
LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
boolean updated = false;
if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
// The follower's log is actually ahead of the leader's log. Normally this doesn't happen
followerLogInformation.setNextIndex(-1);
initiateCaptureSnapshot(followerId);
+
updated = true;
} else if (appendEntriesReply.isSuccess()) {
- updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
+ followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+ // The follower's last entry is present in the leader's journal but the terms don't match so the
+ // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
+ // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
+ // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
+ // index reported by the follower. For the former case, the leader will send all entries starting with
+ // the previous follower's index and the follower will remove and replace the conflicting entries as
+ // needed. For the latter, the leader will initiate an install snapshot.
+
+ followerLogInformation.setNextIndex(followerLastLogIndex - 1);
+ updated = true;
+
+ LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
+ "leader's {} - set the follower's next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ } else {
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ }
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
- long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
if(appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
- followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
+ } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
+ followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// the last snapshot, if any. We'll catch up the follower quickly by starting at the
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
- LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
-
followerLogInformation.decrNextIndex();
+ updated = true;
+
+ LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+ followerLogInformation.getNextIndex());
}
}
} else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if(followerActor != null) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInformation);
}
}
if (installSnapshotState != null) {
// if install snapshot is in process , then sent next chunk if possible
if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInformation);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
if(sendAppendEntries) {
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- entries, followerId);
+ sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
}
}
}
- private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries, String followerId) {
+ private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
+ FollowerLogInformation followerLogInformation) {
+ // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+ // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+ // term that weren't replicated to a majority, which would be a violation of raft.
+ // - if the follower isn't active. In this case we don't know the state of the follower and we send an
+ // empty AppendEntries as a heart beat to prevent election.
+ // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+ // need to send AppendEntries to prevent election.
+ boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+ long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
+ context.getCommitIndex();
+
+ long followerNextIndex = followerLogInformation.getNextIndex();
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
getLogEntryIndex(followerNextIndex - 1),
getLogEntryTerm(followerNextIndex - 1), entries,
- context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
+ leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
- LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
appendEntries);
}
}
/**
+ * Initiates a snapshot capture to install on a follower.
+ *
* Install Snapshot works as follows
- * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 4. On complete, Follower sends back a InstallSnapshotReply.
- * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
- * then send the existing snapshot in chunks to the follower.
- * @param followerId
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
+ *
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
public boolean initiateCaptureSnapshot(String followerId) {
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
return true;
} else {
- return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
+ if(captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
+ }
+
+ return captureInitiated;
}
}
if (followerActor != null) {
long nextIndex = followerLogInfo.getNextIndex();
- if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
+ if (followerLogInfo.getInstallSnapshotState() != null ||
+ context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
canInstallSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInfo);
}
}
}
* Sends a snapshot chunk to a given follower
* InstallSnapshot should qualify as a heartbeat too.
*/
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
-
- // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
- // followerId to the followerToSnapshot map.
- LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-
- int nextChunkIndex = installSnapshotState.incrementChunkIndex();
- Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if(installSnapshotState.isLastChunk(nextChunkIndex)) {
- serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
- }
-
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
- actor()
- );
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
- installSnapshotState.getTotalChunks());
- }
+ private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
+ if (snapshot.isPresent()) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+ logName());
+ followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
- } catch (IOException e) {
- LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
- }
- }
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
- if (installSnapshotState == null) {
- installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
- logName());
- followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
- }
- byte[] nextChunk = installSnapshotState.getNextChunk();
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
+
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshot.get().getLastIncludedIndex(),
+ snapshot.get().getLastIncludedTerm(),
+ nextSnapshotChunk,
+ nextChunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
- return nextChunk;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+ installSnapshotState.getTotalChunks());
+ }
+ }
}
private void sendHeartBeat() {
lastIndex = lastIndex();
long prevCommitIndex = context.getCommitIndex();
- context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
+ if(appendEntries.getLeaderCommit() > prevCommitIndex) {
+ context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
+ }
if (prevCommitIndex != context.getCommitIndex()) {
LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- LOG.debug(
- "{}: Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}",
- logName(), prevLogTerm, appendEntries.getPrevLogTerm());
+ LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
+ "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
+ prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
&& !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
// This would be passed as the hash code of the last chunk when sending the first chunk
static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- private int snapshotChunkSize;
- private final ByteString snapshotBytes;
+ private final int snapshotChunkSize;
private final String logName;
+ private ByteString snapshotBytes;
private int offset = 0;
// the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
+ private int replyReceivedForOffset = -1;
// if replyStatus is false, the previous chunk is attempted
private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
+ private int chunkIndex = FIRST_CHUNK_INDEX;
+ private int totalChunks;
private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
- LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
+ LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
this.snapshotChunkSize = snapshotChunkSize;
- this.snapshotBytes = snapshotBytes;
this.logName = logName;
+ }
+
+ ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+
+ void setSnapshotBytes(ByteString snapshotBytes) {
+ if(this.snapshotBytes != null) {
+ return;
+ }
+
+ this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = size / snapshotChunkSize +
- (size % snapshotChunkSize > 0 ? 1 : 0);
+ totalChunks = (size / snapshotChunkSize) + (size % snapshotChunkSize > 0 ? 1 : 0);
LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
chunkIndex = FIRST_CHUNK_INDEX;
}
- ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
int incrementOffset() {
if(replyStatus) {
// if prev chunk failed, we would want to sent the same chunk again
boolean canSendNextChunk() {
// we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
+ return snapshotBytes != null && replyReceivedForOffset == offset;
}
boolean isLastChunk(int index) {
int size = snapshotChunkSize;
if (snapshotChunkSize > snapshotLength) {
size = snapshotLength;
- } else if (start + snapshotChunkSize > snapshotLength) {
+ } else if ((start + snapshotChunkSize) > snapshotLength) {
size = snapshotLength - start;
}
*/
package org.opendaylight.controller.cluster.raft;
+import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.junit.After;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.util.AbstractStringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
/**
public static class TestRaftActor extends MockRaftActor {
private final TestActorRef<MessageCollectorActor> collectorActor;
- private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
+ private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
private TestRaftActor(Builder builder) {
super(builder);
}
public void startDropMessages(Class<?> msgClass) {
- dropMessages.put(msgClass, Boolean.TRUE);
+ dropMessages.put(msgClass, msg -> true);
+ }
+
+ <T> void startDropMessages(Class<T> msgClass, Predicate<T> filter) {
+ dropMessages.put(msgClass, filter);
}
public void stopDropMessages(Class<?> msgClass) {
getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void handleCommand(Object message) {
if(message instanceof MockPayload) {
}
try {
- if(!dropMessages.containsKey(message.getClass())) {
+ Predicate drop = dropMessages.get(message.getClass());
+ if(drop == null || !drop.test(message)) {
super.handleCommand(message);
}
} finally {
protected DefaultConfigParamsImpl newLeaderConfigParams() {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
- configParams.setElectionTimeoutFactor(1);
+ configParams.setElectionTimeoutFactor(4);
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
actor.getCurrentBehavior().getReplicatedToAllIndex());
}
+
+ static void verifyRaftState(ActorRef raftActor, Consumer<OnDemandRaftState> verifier) {
+ Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ try {
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+ verifier.accept(raftState);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ lastError = new AssertionError("OnDemandRaftState failed", e);
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests isolation of nodes end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
+ private TestActorRef<Actor> follower1NotifierActor;
+ private TestActorRef<Actor> leaderNotifierActor;
+
+ /**
+ * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
+ * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
+ * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
+ * appropriately.
+ */
+ @Test
+ public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
+ testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
+
+ createRaftActors();
+
+ // Send an initial payloads and verify replication.
+
+ MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ MockPayload payload1 = sendPayloadData(leaderActor, "one");
+ verifyApplyJournalEntries(leaderCollectorActor, 1);
+ verifyApplyJournalEntries(follower1CollectorActor, 1);
+ verifyApplyJournalEntries(follower2CollectorActor, 1);
+
+ isolateLeader();
+
+ // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
+
+ testLog.info("Sending payload to isolated leader");
+
+ MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+
+ // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
+ // is collected but not forwarded to the follower RaftActor.
+
+ AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+ assertEquals("getTerm", currentTerm, appendEntries.getTerm());
+ assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
+ assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
+ verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
+
+ // The leader should transition to IsolatedLeader.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+ forceElectionOnFollower1();
+
+ // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2 and committed.
+
+ testLog.info("Sending payload to new leader");
+
+ MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ verifyApplyJournalEntries(follower1CollectorActor, 2);
+ verifyApplyJournalEntries(follower2CollectorActor, 2);
+
+ assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+ assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
+ assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
+ verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
+
+ assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+ follower1Actor.underlyingActor().getState());
+
+ removeIsolation();
+
+ // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+ // with a higher term.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+ // The previous leader has a conflicting log entry at index 2 with a different term which should get
+ // replaced by the new leader's index 1 entry.
+
+ verifyApplyJournalEntries(leaderCollectorActor, 2);
+
+ assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+ assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
+ verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
+
+ assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+ leaderActor.underlyingActor().getState());
+
+ testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
+ }
+
+ /**
+ * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
+ * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
+ * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
+ * sides should reconcile their logs appropriately.
+ */
+ @Test
+ public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
+ testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
+
+ createRaftActors();
+
+ // Submit an initial payload that is committed/applied on all nodes.
+
+ MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ verifyApplyJournalEntries(leaderCollectorActor, 0);
+ verifyApplyJournalEntries(follower1CollectorActor, 0);
+ verifyApplyJournalEntries(follower2CollectorActor, 0);
+
+ // Submit another payload that is replicated to all followers and committed on the leader but the leader is
+ // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
+ // with the updated leader commit index.
+
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+
+ MockPayload payload1 = sendPayloadData(leaderActor, "one");
+
+ // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
+ // message is forwarded to the followers.
+
+ expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+ ae.getEntries().get(0).getData().equals(payload1);
+ });
+
+ expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+ ae.getEntries().get(0).getData().equals(payload1);
+ });
+
+ verifyApplyJournalEntries(leaderCollectorActor, 1);
+
+ isolateLeader();
+
+ // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
+
+ testLog.info("Sending payload to isolated leader");
+
+ MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+
+ // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
+ // is collected but not forwarded to the follower RaftActor.
+
+ AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+ assertEquals("getTerm", currentTerm, appendEntries.getTerm());
+ assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
+ assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
+ verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
+
+ // The leader should transition to IsolatedLeader.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+ forceElectionOnFollower1();
+
+ // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
+ // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
+ // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
+
+ testLog.info("Sending payload to new leader");
+
+ MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ verifyApplyJournalEntries(follower1CollectorActor, 3);
+ verifyApplyJournalEntries(follower2CollectorActor, 3);
+
+ assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+ assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
+ assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
+ verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
+
+ assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+ follower1Actor.underlyingActor().getState());
+
+ removeIsolation();
+
+ // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+ // with a higher term.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+ // The previous leader has a conflicting log entry at index 2 with a different term which should get
+ // replaced by the new leader's entry.
+
+ verifyApplyJournalEntries(leaderCollectorActor, 3);
+
+ verifyRaftState(leaderActor, raftState -> {
+ assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+ assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
+ });
+
+ assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+ leaderActor.underlyingActor().getState());
+
+ // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
+
+ List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
+ for(ApplyState as: applyState) {
+ if(as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
+ fail("Got unexpected ApplyState: " + as);
+ }
+ }
+
+ // The prior leader should not have needed a snapshot installed in order to get it synced.
+
+ assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
+
+ testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
+ }
+
+ /**
+ * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
+ * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
+ * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
+ * and both sides should reconcile their logs appropriately.
+ */
+ @Test
+ public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
+ testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
+
+ createRaftActors();
+
+ // Submit an initial payload that is committed/applied on all nodes.
+
+ MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+ verifyApplyJournalEntries(leaderCollectorActor, 0);
+ verifyApplyJournalEntries(follower1CollectorActor, 0);
+ verifyApplyJournalEntries(follower2CollectorActor, 0);
+
+ // Submit another payload that is replicated to all followers and committed on the leader but the leader is
+ // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
+ // with the updated leader commit index.
+
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+
+ MockPayload payload1 = sendPayloadData(leaderActor, "one");
+
+ // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
+ // message is forwarded to the followers.
+
+ expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+ ae.getEntries().get(0).getData().equals(payload1);
+ });
+
+ expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
+ return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+ ae.getEntries().get(0).getData().equals(payload1);
+ });
+
+ verifyApplyJournalEntries(leaderCollectorActor, 1);
+
+ isolateLeader();
+
+ // Send 3 payloads to the isolated leader so it has uncommitted log entries.
+
+ testLog.info("Sending 3 payloads to isolated leader");
+
+ sendPayloadData(leaderActor, "two");
+ sendPayloadData(leaderActor, "three");
+ sendPayloadData(leaderActor, "four");
+
+ // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
+ // are collected but not forwarded to the follower RaftActor.
+
+ expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+ for(ReplicatedLogEntry e: ae.getEntries()) {
+ if(e.getIndex() == 4) {
+ return true;
+ }
+ }
+ return false;
+ });
+
+ // The leader should transition to IsolatedLeader.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+ forceElectionOnFollower1();
+
+ // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
+ // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
+ // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
+
+ testLog.info("Sending 3 payloads to new leader");
+
+ MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
+ MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
+ verifyApplyJournalEntries(follower1CollectorActor, 5);
+ verifyApplyJournalEntries(follower2CollectorActor, 5);
+
+ assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+ assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
+ assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
+ verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
+
+ assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
+ newLeaderPayload4), follower1Actor.underlyingActor().getState());
+
+ removeIsolation();
+
+ // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+ // with a higher term.
+
+ expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+ // The previous leader has conflicting log entries starting at index 2 with different terms which should get
+ // replaced by the new leader's entries.
+
+ verifyApplyJournalEntries(leaderCollectorActor, 5);
+
+ verifyRaftState(leaderActor, raftState -> {
+ assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+ assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+ assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
+ });
+
+ assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
+ newLeaderPayload4), leaderActor.underlyingActor().getState());
+
+ // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
+
+ List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
+ for(ApplyState as: applyState) {
+ if(as.getReplicatedLogEntry().getTerm() == 1) {
+ fail("Got unexpected ApplyState: " + as);
+ }
+ }
+
+ // The prior leader should not have needed a snapshot installed in order to get it synced.
+
+ assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
+
+ testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
+ }
+
+ private void removeIsolation() {
+ testLog.info("Removing isolation");
+
+ clearMessages(leaderNotifierActor);
+ clearMessages(leaderCollectorActor);
+
+ leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
+ leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
+ follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+ follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+ }
+
+ private void forceElectionOnFollower1() {
+ // Force follower1 to start an election. follower2 should grant the vote.
+
+ testLog.info("Forcing election on {}", follower1Id);
+
+ follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+ expectFirstMatching(follower1NotifierActor, RoleChanged.class,
+ rc -> rc.getNewRole().equals(RaftState.Leader.name()));
+
+ currentTerm = follower1Context.getTermInformation().getCurrentTerm();
+ }
+
+ private void isolateLeader() {
+ // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+ testLog.info("Isolating the leader");
+
+ leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
+ leaderActor.underlyingActor().startDropMessages(RequestVote.class);
+
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+
+ clearMessages(follower1CollectorActor);
+ clearMessages(follower1NotifierActor);
+ clearMessages(leaderNotifierActor);
+ }
+
+ private void createRaftActors() {
+ testLog.info("createRaftActors starting");
+
+ follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ factory.generateActorId(follower1Id + "-notifier"));
+
+ DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
+ followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ followerConfigParams.setElectionTimeoutFactor(1000);
+ follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+ config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+ peerAddresses = ImmutableMap.<String, String>builder().
+ put(follower1Id, follower1Actor.path().toString()).
+ put(follower2Id, follower2Actor.path().toString()).build();
+
+ leaderConfigParams = newLeaderConfigParams();
+ leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+
+ leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ factory.generateActorId(leaderId + "-notifier"));
+
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+ config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+
+ follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+ leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+ leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ waitUntilLeader(leaderActor);
+
+ expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
+
+
+ clearMessages(leaderCollectorActor);
+ clearMessages(follower1CollectorActor);
+ clearMessages(follower2CollectorActor);
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+ currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+ follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+ follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+
+ testLog.info("createRaftActors ending");
+ }
+}
*/
package org.opendaylight.controller.cluster.raft;
-import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
testLog.info("createRaftActors starting");
}
- private static void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
- Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
- Throwable lastError = null;
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- try {
- OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
- GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
- assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
- return;
- } catch (Exception | AssertionError e) {
- lastError = e;
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
- }
-
- throw lastError;
+ private static void verifyRaftState(ActorRef raftActor, final RaftState expState) {
+ verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
}
private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
setReplicatedLog(replicatedLog);
setCommitIndex(replicatedLog.lastIndex());
+ setLastApplied(replicatedLog.lastIndex());
}
@Override public ActorRef actorOf(Props props) {
import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Test;
MessageCollectorActor.clearMessages(leaderCollectorActor);
- testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+ testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
// Send another payload to trigger a second leader snapshot.
MockPayload payload7 = sendPayloadData(leaderActor, "seven");
expSnapshotState.add(payload1);
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
// Send another payload with a large enough relative size in combination with the last payload
// that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
@Test
public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
MockRaftActorContext raftActorContext = createActorContext();
+ raftActorContext.setLastApplied(-1);
raftActorContext.setPeerAddresses(setupPeers(2));
candidate = new Candidate(raftActorContext);
MockRaftActorContext actorContext = createActorContextWithFollower();
actorContext.setCommitIndex(-1);
+ actorContext.setLastApplied(-1);
// The raft context is initialized with a couple log entries. However the commitIndex
// is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ fts.setSnapshotBytes(bs);
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
//send first chunk and no InstallSnapshotReply received yet
actorContext.getReplicatedLog().removeFrom(0);
leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
// Leader will send an immediate heartbeat - ignore it.
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
// installed with a SendInstallSnapshot
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
ByteString bs = toByteString(leadersSnapshot);
leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
commitIndex, snapshotTerm, commitIndex, snapshotTerm));
- LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ fts.setSnapshotBytes(bs);
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while(!fts.isLastChunk(fts.getChunkIndex())) {
fts.getNextChunk();
ByteString bs = toByteString(leadersSnapshot);
byte[] barray = bs.toByteArray();
- LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
+ LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
+ fts.setSnapshotBytes(bs);
assertEquals(bs.size(), barray.length);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(-1, appendEntries.getLeaderCommit());
assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
// Initial heartbeat
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(-1, appendEntries.getLeaderCommit());
assertEquals(0, appendEntries.getEntries().size());
assertEquals(0, appendEntries.getPrevLogIndex());
MessageCollectorActor.clearMessages(followerActor);
MessageCollectorActor.clearMessages(leaderActor);
- // Verify initial AppendEntries sent with the leader's current commit index.
- assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ // Verify initial AppendEntries sent.
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
assertEquals("Log entries size", 0, appendEntries.getEntries().size());
assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
MessageCollectorActor.clearMessages(leaderActor);
// Verify initial AppendEntries sent with the leader's current commit index.
- assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
assertEquals("Log entries size", 0, appendEntries.getEntries().size());
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
MessageCollectorActor.clearMessages(leaderActor);
// Verify initial AppendEntries sent with the leader's current commit index.
- assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
assertEquals("Log entries size", 0, appendEntries.getEntries().size());
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
MessageCollectorActor.clearMessages(leaderActor);
// Verify initial AppendEntries sent with the leader's current commit index.
- assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
assertEquals("Log entries size", 0, appendEntries.getEntries().size());
assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
String nonVotingFollowerId = "nonvoting-follower";
TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
logStart("testTransferLeadershipWithFollowerInSync");
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ leaderActorContext.setLastApplied(-1);
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());