import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
- persistence().persist(applyEntries, NoopProcedure.instance());
+ persistence().persistAsync(applyEntries, NoopProcedure.instance());
} else if (message instanceof FindLeader) {
getSender().tell(
ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
+ replicatedLogEntry.setPersistencePending(true);
LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+ boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+ // Clear the persistence pending flag in the log entry.
+ persistedLogEntry.setPersistencePending(false);
+
if (!hasFollowers()) {
// Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
- raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+ raftContext.setCommitIndex(persistedLogEntry.getIndex());
+ raftContext.setLastApplied(persistedLogEntry.getIndex());
// Apply the state immediately.
- self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+ self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
// Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+ self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
} else {
- context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
- // Send message for replication
- getCurrentBehavior().handleMessage(getSelf(),
- new Replicate(clientActor, identifier, replicatedLogEntry1));
+ // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+ // normally should still be the leader) to check if consensus has now been reached in conjunction with
+ // follower replication.
+ getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
}
- });
+ }, true);
+
+ if (wasAppended && hasFollowers()) {
+ // Send log entry for replication.
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+ }
}
private ReplicatedLog replicatedLog() {
* Appends an entry to the in-memory log and persists it as well.
*
* @param replicatedLogEntry the entry to append
- */
- void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
-
- /**
- * Appends an entry to the in-memory log and persists it as well.
- *
- * @param replicatedLogEntry the entry to append
- * @param callback the Procedure to be notified when persistence is complete.
- */
- void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+ * @param callback the Procedure to be notified when persistence is complete (optional).
+ * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
+ * call and the execution of the associated callback. If false, subsequent messages are stashed and get
+ * delivered after persistence is complete and the associated callback is executed.
+ * @return true if the entry was successfully appended, false otherwise.
+ */
+ boolean appendAndPersist(@Nonnull ReplicatedLogEntry replicatedLogEntry,
+ @Nullable Procedure<ReplicatedLogEntry> callback, boolean doAsync);
/**
* Returns a list of log entries starting from the given index to the end of the log.
* @return the size of the entry in bytes.
*/
int size();
+
+ /**
+ * Checks if persistence is pending for this entry.
+ *
+ * @return true if persistence is pending, false otherwise.
+ */
+ boolean isPersistencePending();
+
+ /**
+ * Sets whether or not persistence is pending for this entry.
+ *
+ * @param pending the new setting.
+ */
+ void setPersistencePending(boolean pending);
}
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
/**
class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
private static final int DATA_SIZE_DIVIDER = 5;
- private final Procedure<DeleteEntries> deleteProcedure = NoopProcedure.instance();
-
private final RaftActorContext context;
private long dataSizeSinceLastSnapshot = 0L;
// FIXME: Maybe this should be done after the command is saved
long adjustedIndex = removeFrom(logEntryIndex);
if (adjustedIndex >= 0) {
- context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance());
return true;
}
}
@Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
- }
-
- @Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
+ public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry,
+ @Nullable final Procedure<ReplicatedLogEntry> callback, boolean doAsync) {
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability
- // of the logs
if (!append(replicatedLogEntry)) {
- return;
+ return false;
}
- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- context.getPersistenceProvider().persist(replicatedLogEntry,
- param -> {
- context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
- int logEntrySize = param.size();
- dataSizeSinceLastSnapshot += logEntrySize;
-
- if (callback != null) {
- callback.apply(param);
- }
+ Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
+ context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+ dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+ if (callback != null) {
+ callback.apply(persistedLogEntry);
}
- );
+ };
+
+ if (doAsync) {
+ context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+ } else {
+ context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+ }
+
+ return true;
}
}
/**
* A {@link ReplicatedLogEntry} implementation.
*/
-public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
+public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
private static final long serialVersionUID = -9085798014576489130L;
private final long index;
private final long term;
private final Payload payload;
+ private transient boolean persistencePending = false;
/**
* Constructs an instance.
return getData().size();
}
+ @Override
+ public boolean isPersistencePending() {
+ return persistencePending;
+ }
+
+ @Override
+ public void setPersistencePending(boolean pending) {
+ persistencePending = pending;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + payload.hashCode();
+ result = prime * result + (int) (index ^ index >>> 32);
+ result = prime * result + (int) (term ^ term >>> 32);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ ReplicatedLogImplEntry other = (ReplicatedLogImplEntry) obj;
+ if (payload == null) {
+ if (other.payload != null) {
+ return false;
+ }
+ } else if (!payload.equals(other.payload)) {
+ return false;
+ }
+
+ if (index != other.index) {
+ return false;
+ }
+
+ if (term != other.term) {
+ return false;
+ }
+
+ return true;
+ }
+
@Override
public String toString() {
return "Entry{index=" + index + ", term=" + term + '}';
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Internal message sent to the leader after persistence is complete to check if replication consensus has been reached.
+ *
+ * @author Thomas Pantelis
+ */
+public final class CheckConsensusReached {
+ public static final CheckConsensusReached INSTANCE = new CheckConsensusReached();
+
+ private CheckConsensusReached() {
+ // Hidden on purpose
+ }
+}
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
}
}
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
}
+ possiblyUpdateCommitIndex();
+
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+ return this;
+ }
+
+ private void possiblyUpdateCommitIndex() {
+ // Figure out if we can update the the commitIndex as follows:
+ // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+ // and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
for (long index = context.getCommitIndex() + 1; ; index++) {
- int replicatedCount = 1;
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+ if (replicatedLogEntry == null) {
+ log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+ logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().size());
+ break;
+ }
+
+ // Count our entry if it has been persisted.
+ int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+ if (replicatedCount == 0) {
+ // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
+ // we should be able to update the commit index if we get a consensus amongst the followers
+ // w/o the local persistence ack however this can cause timing issues with snapshot capture
+ // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
+ // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
+ // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
+ // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
+ // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+ break;
+ }
log.trace("{}: checking Nth index {}", logName(), index);
for (FollowerLogInformation info : followerToLog.values()) {
}
if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
- if (replicatedLogEntry == null) {
- log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
- logName(), index, context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().size());
- break;
- }
-
// Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
// "Raft never commits log entries from previous terms by counting replicas".
// However we keep looping so we can make progress when new entries in the current term
// Apply the change to the state machine
if (context.getCommitIndex() > context.getLastApplied()) {
- if (log.isDebugEnabled()) {
- log.debug(
- "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
- logName(), followerId, context.getCommitIndex(), context.getLastApplied());
- }
+ log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+ context.getCommitIndex(), context.getLastApplied());
applyLogToStateMachine(context.getCommitIndex());
}
if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
-
- //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
- sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
- return this;
}
private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
replicate((Replicate) message);
} else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
+ } else if (message instanceof CheckConsensusReached) {
+ possiblyUpdateCommitIndex();
} else {
return super.handleMessage(sender, message);
}
log.debug("{}: Append entry to log {}", logName(), entry.getData());
- context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
+ context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
if (entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
}
@Override
- public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
- }
-
- @Override
- public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+ public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+ boolean doAsync) {
+ return true;
}
@Override
return removeFrom(index) >= 0;
}
- @Override
- public void appendAndPersist(
- ReplicatedLogEntry replicatedLogEntry) {
- append(replicatedLogEntry);
- }
-
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+ public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+ boolean doAsync) {
append(replicatedLogEntry);
if (callback != null) {
Throwables.propagate(e);
}
}
+
+ return true;
}
}
}
}
- public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
+ // TODO - this class can be removed and use ReplicatedLogImplEntry directly.
+ public static class MockReplicatedLogEntry extends ReplicatedLogImplEntry {
private static final long serialVersionUID = 1L;
- private final long term;
- private final long index;
- private final Payload data;
-
public MockReplicatedLogEntry(long term, long index, Payload data) {
-
- this.term = term;
- this.index = index;
- this.data = data;
- }
-
- @Override public Payload getData() {
- return data;
- }
-
- @Override public long getTerm() {
- return term;
- }
-
- @Override public long getIndex() {
- return index;
- }
-
- @Override
- public int size() {
- return getData().size();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (data == null ? 0 : data.hashCode());
- result = prime * result + (int) (index ^ index >>> 32);
- result = prime * result + (int) (term ^ term >>> 32);
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
- if (data == null) {
- if (other.data != null) {
- return false;
- }
- } else if (!data.equals(other.data)) {
- return false;
- }
- if (index != other.index) {
- return false;
- }
- if (term != other.term) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
- .append(", data=").append(data).append("]");
- return builder.toString();
+ super(index, term, data);
}
}
mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
- verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
+ verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
}
@Test
TEST_LOG.info("testLeaderTransitioning ending");
}
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testReplicateWithPersistencePending() throws Exception {
+ final String leaderId = factory.generateActorId("leader-");
+ final String followerId = factory.generateActorId("follower-");
+
+ final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
+ doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
+
+ TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+ MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
+ mockPersistenceProvider), leaderId);
+ MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+ leaderActor.waitForInitializeBehaviorComplete();
+
+ leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+ Leader leader = new Leader(leaderActor.getRaftActorContext());
+ leaderActor.setCurrentBehavior(leader);
+
+ leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+
+ ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
+ assertNotNull("ReplicatedLogEntry not found", logEntry);
+ assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+ assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+ ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
+ verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
+
+ callbackCaptor.getValue().apply(logEntry);
+
+ assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
+ assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
+ }
}
}
private void verifyPersist(Object message) throws Exception {
- verifyPersist(message, new Same(message));
+ verifyPersist(message, new Same(message), true);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void verifyPersist(Object message, Matcher<?> matcher) throws Exception {
+ private void verifyPersist(Object message, Matcher<?> matcher, boolean async) throws Exception {
ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
- verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+ if (async) {
+ verify(mockPersistence).persistAsync(Matchers.argThat(matcher), procedure.capture());
+ } else {
+ verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+ }
procedure.getValue().apply(message);
}
MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
- log.appendAndPersist(logEntry1);
+ log.appendAndPersist(logEntry1, null, true);
verifyPersist(logEntry1);
MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
- log.appendAndPersist(logEntry2, mockCallback);
+ log.appendAndPersist(logEntry2, mockCallback, true);
verifyPersist(logEntry2);
Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
- log.appendAndPersist(logEntry, mockCallback);
+ log.appendAndPersist(logEntry, mockCallback, true);
verifyPersist(logEntry);
reset(mockPersistence, mockCallback);
- log.appendAndPersist(logEntry, mockCallback);
+ log.appendAndPersist(logEntry, mockCallback, true);
verifyNoMoreInteractions(mockPersistence, mockCallback);
final MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
final MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
- log.appendAndPersist(logEntry1);
+ log.appendAndPersist(logEntry1, null, true);
verifyPersist(logEntry1);
reset(mockPersistence);
- log.appendAndPersist(logEntry2);
+ log.appendAndPersist(logEntry2, null, true);
verifyPersist(logEntry2);
int dataSize = 600;
MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
- log.appendAndPersist(logEntry);
+ log.appendAndPersist(logEntry, null, true);
verifyPersist(logEntry);
reset(mockPersistence);
logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
- log.appendAndPersist(logEntry);
+ log.appendAndPersist(logEntry, null, true);
verifyPersist(logEntry);
assertEquals("size", 2, log.size());
log.removeFromAndPersist(1);
DeleteEntries deleteEntries = new DeleteEntries(1);
- verifyPersist(deleteEntries, match(deleteEntries));
+ verifyPersist(deleteEntries, match(deleteEntries), false);
assertEquals("size", 1, log.size());
// Configure follower 2 to drop messages and lag.
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
// Send 5 payloads - the second should cause a leader snapshot.
final MockPayload payload2 = sendPayloadData(leaderActor, "two");
final MockPayload payload3 = sendPayloadData(leaderActor, "three");
testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
+ "sending 1 more payload to trigger second snapshot");
- // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
- Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
- TimeUnit.MILLISECONDS);
-
// Send another payload to trigger a second leader snapshot.
MockPayload payload7 = sendPayloadData(leaderActor, "seven");
follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
// Send a payload with a large relative size but not enough to trigger a snapshot.
MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
// Sends 3 payloads with indexes 4, 5 and 6.
- verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
+ long leadersSnapshotIndexOnRecovery = verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
// Recover the leader from persistence and verify.
long leadersLastIndexOnRecovery = 6;
- // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3.
- long leadersSnapshotIndexOnRecovery = 3;
-
- // The recovered journal should have 3 entries starting at index 4.
- long leadersFirstJournalEntryIndexOnRecovery = 4;
+ long leadersFirstJournalEntryIndexOnRecovery = leadersSnapshotIndexOnRecovery + 1;
verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
leadersFirstJournalEntryIndexOnRecovery);
*/
private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
@Nullable ServerConfigurationPayload expServerConfig) throws Exception {
- testLog.info("testInstallSnapshotToLaggingFollower starting");
+ testLog.info("verifyInstallSnapshotToLaggingFollower starting");
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);
- testLog.info("testInstallSnapshotToLaggingFollower complete");
+ testLog.info("verifyInstallSnapshotToLaggingFollower complete");
}
/**
* Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
* snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
*/
- private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
- testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
+ private long verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
+ testLog.info(
+ "verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot starting: replicatedToAllIndex: {}",
leader.getReplicatedToAllIndex());
// Send another payload - a snapshot should occur.
// Verify the leader's last persisted snapshot (previous ones may not be purged yet).
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
+ // The last (fourth) payload may or may not have been applied when the snapshot is captured depending on the
+ // timing when the async persistence completes.
List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
- verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
+ long leadersSnapshotIndex;
+ if (unAppliedEntry.isEmpty()) {
+ leadersSnapshotIndex = 4;
+ expSnapshotState.add(payload4);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 4, currentTerm, 4);
+ } else {
+ leadersSnapshotIndex = 3;
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
+ verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
+ expSnapshotState.add(payload4);
+ }
// Send a couple more payloads.
MockPayload payload5 = sendPayloadData(leaderActor, "five");
// Verify follower 2's log state.
verifyFollowersTrimmedLog(2, follower2Actor, 6);
- expSnapshotState.add(payload4);
expSnapshotState.add(payload5);
expSnapshotState.add(payload6);
- testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending");
+ testLog.info("verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot ending");
+
+ return leadersSnapshotIndex;
}
/**
*/
private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
long firstJournalEntryIndex) {
- testLog.info("testLeaderReinstatement starting");
+ testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
+ + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
killActor(leaderActor);
assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
- testLog.info("testLeaderReinstatement ending");
+ testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
}
private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
if (journalMap != null) {
synchronized (journalMap) {
for (Map.Entry<Long, Object> e: journalMap.entrySet()) {
- builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue());
+ builder.append("\n ").append(e.getKey()).append(" = ").append(deserialize(e.getValue()));
}
}
}