From: Tom Pantelis Date: Thu, 17 Nov 2016 14:10:23 +0000 (-0500) Subject: Bug 5419: Persist log entries asycnhronously X-Git-Tag: release/carbon~380 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=fa96da71c5ab10973a9f93c2e8b35494b96fd7ed Bug 5419: Persist log entries asycnhronously Modified RaftActor#persistData to persist the ReplicatedLogEntry using akka's persistAsync method. This is similar to the persist method except subsequent messages are delivered prior to persistence completion. This avoids blocking the RaftActor so it can process AppendEntriesReply and other messages while persistence is in progress. In addition, AbstractLeader was modified to only count itself for consensus when persistence is complete. This required communicating the persistence complete state to the AbstractLeader. A transient persistencePending flag was added to the ReplicatedLogImplEntry that is set by RaftActor#persistData prior to the persist call and is cleared when the persist callback executes. AbstractLeader checks the flag when counting consensus. It's possible that the persistence complete event arrives after AppendEntriesReply messages from replicated followers so a new message, CheckConsensusReached, is sent by the RaftActor on persistence complete to check if consensus is reached. Change-Id: If34a5f395d52e17b2737464a2e2403f56a520c43 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index e3fa649ab1..46551506e3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -262,7 +263,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); - persistence().persist(applyEntries, NoopProcedure.instance()); + persistence().persistAsync(applyEntries, NoopProcedure.instance()); } else if (message instanceof FindLeader) { getSender().tell( @@ -520,32 +521,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); + replicatedLogEntry.setPersistencePending(true); LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> { + // Clear the persistence pending flag in the log entry. + persistedLogEntry.setPersistencePending(false); + if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); - raftContext.setLastApplied(replicatedLogEntry1.getIndex()); + raftContext.setCommitIndex(persistedLogEntry.getIndex()); + raftContext.setLastApplied(persistedLogEntry.getIndex()); // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); + self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self()); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); + self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self()); } else { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry1)); + // Local persistence is complete so send the CheckConsensusReached message to the behavior (which + // normally should still be the leader) to check if consensus has now been reached in conjunction with + // follower replication. + getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE); } - }); + }, true); + + if (wasAppended && hasFollowers()) { + // Send log entry for replication. + getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry)); + } } private ReplicatedLog replicatedLog() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index cd70812d9d..6e34ff0393 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -91,16 +91,14 @@ public interface ReplicatedLog { * Appends an entry to the in-memory log and persists it as well. * * @param replicatedLogEntry the entry to append - */ - void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); - - /** - * Appends an entry to the in-memory log and persists it as well. - * - * @param replicatedLogEntry the entry to append - * @param callback the Procedure to be notified when persistence is complete. - */ - void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback); + * @param callback the Procedure to be notified when persistence is complete (optional). + * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist + * call and the execution of the associated callback. If false, subsequent messages are stashed and get + * delivered after persistence is complete and the associated callback is executed. + * @return true if the entry was successfully appended, false otherwise. + */ + boolean appendAndPersist(@Nonnull ReplicatedLogEntry replicatedLogEntry, + @Nullable Procedure callback, boolean doAsync); /** * Returns a list of log entries starting from the given index to the end of the log. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java index a09a0a23ac..1348ffca91 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java @@ -41,4 +41,18 @@ public interface ReplicatedLogEntry { * @return the size of the entry in bytes. */ int size(); + + /** + * Checks if persistence is pending for this entry. + * + * @return true if persistence is pending, false otherwise. + */ + boolean isPersistencePending(); + + /** + * Sets whether or not persistence is pending for this entry. + * + * @param pending the new setting. + */ + void setPersistencePending(boolean pending); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java index 9123c14d5d..04606fbbab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java @@ -11,6 +11,8 @@ import akka.japi.Procedure; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; /** @@ -19,8 +21,6 @@ import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; class ReplicatedLogImpl extends AbstractReplicatedLogImpl { private static final int DATA_SIZE_DIVIDER = 5; - private final Procedure deleteProcedure = NoopProcedure.instance(); - private final RaftActorContext context; private long dataSizeSinceLastSnapshot = 0L; @@ -45,7 +45,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { // FIXME: Maybe this should be done after the command is saved long adjustedIndex = removeFrom(logEntryIndex); if (adjustedIndex >= 0) { - context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure); + context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance()); return true; } @@ -87,38 +87,31 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl { } @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) { - appendAndPersist(replicatedLogEntry, null); - } - - @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry, - final Procedure callback) { + public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry, + @Nullable final Procedure callback, boolean doAsync) { context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry); - // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability - // of the logs if (!append(replicatedLogEntry)) { - return; + return false; } - // When persisting events with persist it is guaranteed that the - // persistent actor will not receive further commands between the - // persist call and the execution(s) of the associated event - // handler. This also holds for multiple persist calls in context - // of a single command. - context.getPersistenceProvider().persist(replicatedLogEntry, - param -> { - context.getLogger().debug("{}: persist complete {}", context.getId(), param); - - int logEntrySize = param.size(); - dataSizeSinceLastSnapshot += logEntrySize; - - if (callback != null) { - callback.apply(param); - } + Procedure persistCallback = persistedLogEntry -> { + context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry); + + dataSizeSinceLastSnapshot += persistedLogEntry.size(); + + if (callback != null) { + callback.apply(persistedLogEntry); } - ); + }; + + if (doAsync) { + context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback); + } else { + context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback); + } + + return true; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java index 1ff69e7f4a..80193590db 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java @@ -15,12 +15,13 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa /** * A {@link ReplicatedLogEntry} implementation. */ -public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable { +public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable { private static final long serialVersionUID = -9085798014576489130L; private final long index; private final long term; private final Payload payload; + private transient boolean persistencePending = false; /** * Constructs an instance. @@ -55,6 +56,60 @@ public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Seriali return getData().size(); } + @Override + public boolean isPersistencePending() { + return persistencePending; + } + + @Override + public void setPersistencePending(boolean pending) { + persistencePending = pending; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + payload.hashCode(); + result = prime * result + (int) (index ^ index >>> 32); + result = prime * result + (int) (term ^ term >>> 32); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + ReplicatedLogImplEntry other = (ReplicatedLogImplEntry) obj; + if (payload == null) { + if (other.payload != null) { + return false; + } + } else if (!payload.equals(other.payload)) { + return false; + } + + if (index != other.index) { + return false; + } + + if (term != other.term) { + return false; + } + + return true; + } + @Override public String toString() { return "Entry{index=" + index + ", term=" + term + '}'; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java new file mode 100644 index 0000000000..c3f9c44fb1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.base.messages; + +/** + * Internal message sent to the leader after persistence is complete to check if replication consensus has been reached. + * + * @author Thomas Pantelis + */ +public final class CheckConsensusReached { + public static final CheckConsensusReached INSTANCE = new CheckConsensusReached(); + + private CheckConsensusReached() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 31bf99c2dc..d97905cf11 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -291,17 +292,47 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}", logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm()); } + possiblyUpdateCommitIndex(); + + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + + return this; + } + + private void possiblyUpdateCommitIndex() { + // Figure out if we can update the the commitIndex as follows: + // If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N, + // and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). for (long index = context.getCommitIndex() + 1; ; index++) { - int replicatedCount = 1; + ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); + if (replicatedLogEntry == null) { + log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", + logName(), index, context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().size()); + break; + } + + // Count our entry if it has been persisted. + int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; + + if (replicatedCount == 0) { + // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally + // we should be able to update the commit index if we get a consensus amongst the followers + // w/o the local persistence ack however this can cause timing issues with snapshot capture + // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's + // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding + // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't + // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState + // message in lieu of synchronously updating lastAppliedIndex and applying to state. + break; + } log.trace("{}: checking Nth index {}", logName(), index); for (FollowerLogInformation info : followerToLog.values()) { @@ -320,14 +351,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index); - if (replicatedLogEntry == null) { - log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}", - logName(), index, context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().size()); - break; - } - // Don't update the commit index if the log entry is from a previous term, as per §5.4.1: // "Raft never commits log entries from previous terms by counting replicas". // However we keep looping so we can make progress when new entries in the current term @@ -349,11 +372,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { - if (log.isDebugEnabled()) { - log.debug( - "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}", - logName(), followerId, context.getCommitIndex(), context.getLastApplied()); - } + log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(), + context.getCommitIndex(), context.getLastApplied()); applyLogToStateMachine(context.getCommitIndex()); } @@ -361,11 +381,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (!context.getSnapshotManager().isCapturing()) { purgeInMemoryLog(); } - - //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); - - return this; } private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation, @@ -448,6 +463,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof CheckConsensusReached) { + possiblyUpdateCommitIndex(); } else { return super.handleMessage(sender, message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d4747da09d..7e8a772591 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -242,7 +242,7 @@ public class Follower extends AbstractRaftActorBehavior { log.debug("{}: Append entry to log {}", logName(), entry.getData()); - context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback); + context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false); if (entry.getData() instanceof ServerConfigurationPayload) { context.updatePeerIds((ServerConfigurationPayload)entry.getData()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java index 6c6133f173..bf0eda6a5e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -325,11 +325,9 @@ public class AbstractReplicatedLogImplTest { } @Override - public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { - } - - @Override - public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) { + public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback, + boolean doAsync) { + return true; } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 95764ad202..d3af205abe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -146,15 +146,10 @@ public class MockRaftActorContext extends RaftActorContextImpl { return removeFrom(index) >= 0; } - @Override - public void appendAndPersist( - ReplicatedLogEntry replicatedLogEntry) { - append(replicatedLogEntry); - } - @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback) { + public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure callback, + boolean doAsync) { append(replicatedLogEntry); if (callback != null) { @@ -164,6 +159,8 @@ public class MockRaftActorContext extends RaftActorContextImpl { Throwables.propagate(e); } } + + return true; } } @@ -226,81 +223,12 @@ public class MockRaftActorContext extends RaftActorContextImpl { } } - public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable { + // TODO - this class can be removed and use ReplicatedLogImplEntry directly. + public static class MockReplicatedLogEntry extends ReplicatedLogImplEntry { private static final long serialVersionUID = 1L; - private final long term; - private final long index; - private final Payload data; - public MockReplicatedLogEntry(long term, long index, Payload data) { - - this.term = term; - this.index = index; - this.data = data; - } - - @Override public Payload getData() { - return data; - } - - @Override public long getTerm() { - return term; - } - - @Override public long getIndex() { - return index; - } - - @Override - public int size() { - return getData().size(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (data == null ? 0 : data.hashCode()); - result = prime * result + (int) (index ^ index >>> 32); - result = prime * result + (int) (term ^ term >>> 32); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj; - if (data == null) { - if (other.data != null) { - return false; - } - } else if (!data.equals(other.data)) { - return false; - } - if (index != other.index) { - return false; - } - if (term != other.term) { - return false; - } - return true; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index) - .append(", data=").append(data).append("]"); - return builder.toString(); + super(index, term, data); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 7477f14168..da0493fe22 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -397,7 +397,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10)); - verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class)); + verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class)); } @Test @@ -1283,4 +1283,49 @@ public class RaftActorTest extends AbstractActorTest { TEST_LOG.info("testLeaderTransitioning ending"); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testReplicateWithPersistencePending() throws Exception { + final String leaderId = factory.generateActorId("leader-"); + final String followerId = factory.generateActorId("follower-"); + + final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class); + doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable(); + + TestActorRef leaderActorRef = factory.createTestActor( + MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config, + mockPersistenceProvider), leaderId); + MockRaftActor leaderActor = leaderActorRef.underlyingActor(); + leaderActor.waitForInitializeBehaviorComplete(); + + leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1")); + + ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0); + assertNotNull("ReplicatedLogEntry not found", logEntry); + assertEquals("isPersistencePending", true, logEntry.isPersistencePending()); + assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex()); + + leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0)); + assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex()); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Procedure.class); + verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture()); + + callbackCaptor.getValue().apply(logEntry); + + assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex()); + assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied()); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java index 75c0d8980b..6449f0afc8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java @@ -62,13 +62,17 @@ public class ReplicatedLogImplTest { } private void verifyPersist(Object message) throws Exception { - verifyPersist(message, new Same(message)); + verifyPersist(message, new Same(message), true); } @SuppressWarnings({ "unchecked", "rawtypes" }) - private void verifyPersist(Object message, Matcher matcher) throws Exception { + private void verifyPersist(Object message, Matcher matcher, boolean async) throws Exception { ArgumentCaptor procedure = ArgumentCaptor.forClass(Procedure.class); - verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture()); + if (async) { + verify(mockPersistence).persistAsync(Matchers.argThat(matcher), procedure.capture()); + } else { + verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture()); + } procedure.getValue().apply(message); } @@ -80,7 +84,7 @@ public class ReplicatedLogImplTest { MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); - log.appendAndPersist(logEntry1); + log.appendAndPersist(logEntry1, null, true); verifyPersist(logEntry1); @@ -90,7 +94,7 @@ public class ReplicatedLogImplTest { MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2")); Procedure mockCallback = Mockito.mock(Procedure.class); - log.appendAndPersist(logEntry2, mockCallback); + log.appendAndPersist(logEntry2, mockCallback, true); verifyPersist(logEntry2); @@ -107,7 +111,7 @@ public class ReplicatedLogImplTest { Procedure mockCallback = Mockito.mock(Procedure.class); MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1")); - log.appendAndPersist(logEntry, mockCallback); + log.appendAndPersist(logEntry, mockCallback, true); verifyPersist(logEntry); @@ -115,7 +119,7 @@ public class ReplicatedLogImplTest { reset(mockPersistence, mockCallback); - log.appendAndPersist(logEntry, mockCallback); + log.appendAndPersist(logEntry, mockCallback, true); verifyNoMoreInteractions(mockPersistence, mockCallback); @@ -133,12 +137,12 @@ public class ReplicatedLogImplTest { final MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2")); final MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3")); - log.appendAndPersist(logEntry1); + log.appendAndPersist(logEntry1, null, true); verifyPersist(logEntry1); reset(mockPersistence); - log.appendAndPersist(logEntry2); + log.appendAndPersist(logEntry2, null, true); verifyPersist(logEntry2); @@ -156,14 +160,14 @@ public class ReplicatedLogImplTest { int dataSize = 600; MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize)); - log.appendAndPersist(logEntry); + log.appendAndPersist(logEntry, null, true); verifyPersist(logEntry); reset(mockPersistence); logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5)); - log.appendAndPersist(logEntry); + log.appendAndPersist(logEntry, null, true); verifyPersist(logEntry); assertEquals("size", 2, log.size()); @@ -181,7 +185,7 @@ public class ReplicatedLogImplTest { log.removeFromAndPersist(1); DeleteEntries deleteEntries = new DeleteEntries(1); - verifyPersist(deleteEntries, match(deleteEntries)); + verifyPersist(deleteEntries, match(deleteEntries), false); assertEquals("size", 1, log.size()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index 4211119e12..b81836e97a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -297,6 +297,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Configure follower 2 to drop messages and lag. follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader. + Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5, + TimeUnit.MILLISECONDS); + // Send 5 payloads - the second should cause a leader snapshot. final MockPayload payload2 = sendPayloadData(leaderActor, "two"); final MockPayload payload3 = sendPayloadData(leaderActor, "three"); @@ -317,10 +321,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: " + "sending 1 more payload to trigger second snapshot"); - // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader. - Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5, - TimeUnit.MILLISECONDS); - // Send another payload to trigger a second leader snapshot. MockPayload payload7 = sendPayloadData(leaderActor, "seven"); @@ -402,6 +402,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader. + Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5, + TimeUnit.MILLISECONDS); + // Send a payload with a large relative size but not enough to trigger a snapshot. MockPayload payload1 = sendPayloadData(leaderActor, "one", 500); @@ -462,16 +466,12 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot(); // Sends 3 payloads with indexes 4, 5 and 6. - verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot(); + long leadersSnapshotIndexOnRecovery = verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot(); // Recover the leader from persistence and verify. long leadersLastIndexOnRecovery = 6; - // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3. - long leadersSnapshotIndexOnRecovery = 3; - - // The recovered journal should have 3 entries starting at index 4. - long leadersFirstJournalEntryIndexOnRecovery = 4; + long leadersFirstJournalEntryIndexOnRecovery = leadersSnapshotIndexOnRecovery + 1; verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery, leadersFirstJournalEntryIndexOnRecovery); @@ -528,7 +528,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A */ private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex, @Nullable ServerConfigurationPayload expServerConfig) throws Exception { - testLog.info("testInstallSnapshotToLaggingFollower starting"); + testLog.info("verifyInstallSnapshotToLaggingFollower starting"); MessageCollectorActor.clearMessages(leaderCollectorActor); @@ -614,15 +614,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); - testLog.info("testInstallSnapshotToLaggingFollower complete"); + testLog.info("verifyInstallSnapshotToLaggingFollower complete"); } /** * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower. */ - private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception { - testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}", + private long verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception { + testLog.info( + "verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); // Send another payload - a snapshot should occur. @@ -637,10 +638,21 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's last persisted snapshot (previous ones may not be purged yet). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4); + // The last (fourth) payload may or may not have been applied when the snapshot is captured depending on the + // timing when the async persistence completes. List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); - assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); - verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); + long leadersSnapshotIndex; + if (unAppliedEntry.isEmpty()) { + leadersSnapshotIndex = 4; + expSnapshotState.add(payload4); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 4, currentTerm, 4); + } else { + leadersSnapshotIndex = 3; + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4); + assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); + verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); + expSnapshotState.add(payload4); + } // Send a couple more payloads. MockPayload payload5 = sendPayloadData(leaderActor, "five"); @@ -701,11 +713,12 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2's log state. verifyFollowersTrimmedLog(2, follower2Actor, 6); - expSnapshotState.add(payload4); expSnapshotState.add(payload5); expSnapshotState.add(payload6); - testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending"); + testLog.info("verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot ending"); + + return leadersSnapshotIndex; } /** @@ -713,7 +726,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A */ private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) { - testLog.info("testLeaderReinstatement starting"); + testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, " + + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex); killActor(leaderActor); @@ -741,7 +755,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState()); - testLog.info("testLeaderReinstatement ending"); + testLog.info("verifyLeaderRecoveryAfterReinstatement ending"); } private void sendInitialPayloadsReplicatedToAllFollowers(String... data) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index f2a216c821..8c6cf09457 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -109,7 +109,7 @@ public class InMemoryJournal extends AsyncWriteJournal { if (journalMap != null) { synchronized (journalMap) { for (Map.Entry e: journalMap.entrySet()) { - builder.append("\n ").append(e.getKey()).append(" = ").append(e.getValue()); + builder.append("\n ").append(e.getKey()).append(" = ").append(deserialize(e.getValue())); } } }