Bug 5419: Persist log entries asycnhronously 41/48441/6
authorTom Pantelis <tpanteli@brocade.com>
Thu, 17 Nov 2016 14:10:23 +0000 (09:10 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 29 Nov 2016 12:44:49 +0000 (12:44 +0000)
Modified RaftActor#persistData to persist the ReplicatedLogEntry
using akka's persistAsync method. This is similar to the persist method
except subsequent messages are delivered prior to persistence completion.
This avoids blocking the RaftActor so it can process AppendEntriesReply
and other messages while persistence is in progress.

In addition, AbstractLeader was modified to only count itself for consensus
when persistence is complete. This required communicating the persistence
complete state to the AbstractLeader. A transient persistencePending flag was
added to the ReplicatedLogImplEntry that is set by RaftActor#persistData
prior to the persist call and is cleared when the persist callback executes.
AbstractLeader checks the flag when counting consensus. It's possible that
the persistence complete event arrives after AppendEntriesReply messages
from replicated followers so a new message, CheckConsensusReached, is sent
by the RaftActor on persistence complete to check if consensus is reached.

Change-Id: If34a5f395d52e17b2737464a2e2403f56a520c43
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java

index e3fa649ab15294ae16d87ba9b368375020f27e91..46551506e337182df1c0f7a05e2a6fe1e11043ab 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -262,7 +263,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-            persistence().persist(applyEntries, NoopProcedure.instance());
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -520,32 +521,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
         LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
+
             if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
-                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
-                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
-                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
 
             } else {
-                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
-                // Send message for replication
-                getCurrentBehavior().handleMessage(getSelf(),
-                        new Replicate(clientActor, identifier, replicatedLogEntry1));
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
             }
-        });
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+        }
     }
 
     private ReplicatedLog replicatedLog() {
index cd70812d9d38d9c1f112a3c9978dfe4ee9d31e8c..6e34ff0393e8840ea70e0b7d398cc155c257d294 100644 (file)
@@ -91,16 +91,14 @@ public interface ReplicatedLog {
      * Appends an entry to the in-memory log and persists it as well.
      *
      * @param replicatedLogEntry the entry to append
-     */
-    void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
-
-    /**
-     * Appends an entry to the in-memory log and persists it as well.
-     *
-     * @param replicatedLogEntry the entry to append
-     * @param callback the Procedure to be notified when persistence is complete.
-     */
-    void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+     * @param callback the Procedure to be notified when persistence is complete (optional).
+     * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
+     *        call and the execution of the associated callback. If false, subsequent messages are stashed and get
+     *        delivered after persistence is complete and the associated callback is executed.
+     * @return true if the entry was successfully appended, false otherwise.
+     */
+    boolean appendAndPersist(@Nonnull ReplicatedLogEntry replicatedLogEntry,
+            @Nullable Procedure<ReplicatedLogEntry> callback, boolean doAsync);
 
     /**
      * Returns a list of log entries starting from the given index to the end of the log.
index a09a0a23ac40299bf917a7b220bb0aa2316d1141..1348ffca9163adf786d4f715fce3ddf858605548 100644 (file)
@@ -41,4 +41,18 @@ public interface ReplicatedLogEntry {
      * @return the size of the entry in bytes.
      */
     int size();
+
+    /**
+     * Checks if persistence is pending for this entry.
+     *
+     * @return true if persistence is pending, false otherwise.
+     */
+    boolean isPersistencePending();
+
+    /**
+     * Sets whether or not persistence is pending for this entry.
+     *
+     * @param pending the new setting.
+     */
+    void setPersistencePending(boolean pending);
 }
index 9123c14d5d615553a2abf58d19e9a1d89ad63f39..04606fbbabc61e6f47c9780cd9207c6ebfa5ae54 100644 (file)
@@ -11,6 +11,8 @@ import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 
 /**
@@ -19,8 +21,6 @@ import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private static final int DATA_SIZE_DIVIDER = 5;
 
-    private final Procedure<DeleteEntries> deleteProcedure = NoopProcedure.instance();
-
     private final RaftActorContext context;
     private long dataSizeSinceLastSnapshot = 0L;
 
@@ -45,7 +45,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if (adjustedIndex >= 0) {
-            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance());
             return true;
         }
 
@@ -87,38 +87,31 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
-        appendAndPersist(replicatedLogEntry, null);
-    }
-
-    @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
+    public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry,
+            @Nullable final Procedure<ReplicatedLogEntry> callback, boolean doAsync)  {
 
         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
 
-        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability
-        // of the logs
         if (!append(replicatedLogEntry)) {
-            return;
+            return false;
         }
 
-        // When persisting events with persist it is guaranteed that the
-        // persistent actor will not receive further commands between the
-        // persist call and the execution(s) of the associated event
-        // handler. This also holds for multiple persist calls in context
-        // of a single command.
-        context.getPersistenceProvider().persist(replicatedLogEntry,
-            param -> {
-                context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
-                int logEntrySize = param.size();
-                dataSizeSinceLastSnapshot += logEntrySize;
-
-                if (callback != null) {
-                    callback.apply(param);
-                }
+        Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
+            context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+            dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+            if (callback != null) {
+                callback.apply(persistedLogEntry);
             }
-        );
+        };
+
+        if (doAsync) {
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+        } else {
+            context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+        }
+
+        return true;
     }
 }
index 1ff69e7f4a0ccf198c11333f013bd1d46b7c910a..80193590dbfab16c6572a259a7b46f13cf894703 100644 (file)
@@ -15,12 +15,13 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A {@link ReplicatedLogEntry} implementation.
  */
-public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
+public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable {
     private static final long serialVersionUID = -9085798014576489130L;
 
     private final long index;
     private final long term;
     private final Payload payload;
+    private transient boolean persistencePending = false;
 
     /**
      * Constructs an instance.
@@ -55,6 +56,60 @@ public final class ReplicatedLogImplEntry implements ReplicatedLogEntry, Seriali
         return getData().size();
     }
 
+    @Override
+    public boolean isPersistencePending() {
+        return persistencePending;
+    }
+
+    @Override
+    public void setPersistencePending(boolean pending) {
+        persistencePending = pending;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + payload.hashCode();
+        result = prime * result + (int) (index ^ index >>> 32);
+        result = prime * result + (int) (term ^ term >>> 32);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        ReplicatedLogImplEntry other = (ReplicatedLogImplEntry) obj;
+        if (payload == null) {
+            if (other.payload != null) {
+                return false;
+            }
+        } else if (!payload.equals(other.payload)) {
+            return false;
+        }
+
+        if (index != other.index) {
+            return false;
+        }
+
+        if (term != other.term) {
+            return false;
+        }
+
+        return true;
+    }
+
     @Override
     public String toString() {
         return "Entry{index=" + index + ", term=" + term + '}';
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CheckConsensusReached.java
new file mode 100644 (file)
index 0000000..c3f9c44
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Internal message sent to the leader after persistence is complete to check if replication consensus has been reached.
+ *
+ * @author Thomas Pantelis
+ */
+public final class CheckConsensusReached {
+    public static final CheckConsensusReached INSTANCE = new CheckConsensusReached();
+
+    private CheckConsensusReached() {
+        // Hidden on purpose
+    }
+}
index 31bf99c2dc2bc9f5564c256294c605f6af1bd482..d97905cf11e4f7174bc85505de27b1687da347d2 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -291,17 +292,47 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
         }
 
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, Â§5.4).
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
+        possiblyUpdateCommitIndex();
+
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+        return this;
+    }
+
+    private void possiblyUpdateCommitIndex() {
+        // Figure out if we can update the the commitIndex as follows:
+        //   If there exists an index N such that N > commitIndex, a majority of matchIndex[i] â‰¥ N,
+        //     and log[N].term == currentTerm:
+        //   set commitIndex = N (§5.3, Â§5.4).
         for (long index = context.getCommitIndex() + 1; ; index++) {
-            int replicatedCount = 1;
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+            if (replicatedLogEntry == null) {
+                log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                        logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().size());
+                break;
+            }
+
+            // Count our entry if it has been persisted.
+            int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+            if (replicatedCount == 0) {
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
+                // we should be able to update the commit index if we get a consensus amongst the followers
+                // w/o the local persistence ack however this can cause timing issues with snapshot capture
+                // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
+                // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
+                // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
+                // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
+                // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+                break;
+            }
 
             log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
@@ -320,14 +351,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
-                if (replicatedLogEntry == null) {
-                    log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
-                            logName(), index, context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().size());
-                    break;
-                }
-
                 // Don't update the commit index if the log entry is from a previous term, as per Â§5.4.1:
                 // "Raft never commits log entries from previous terms by counting replicas".
                 // However we keep looping so we can make progress when new entries in the current term
@@ -349,11 +372,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                    "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
-                    logName(), followerId, context.getCommitIndex(), context.getLastApplied());
-            }
+            log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+                    context.getCommitIndex(), context.getLastApplied());
 
             applyLogToStateMachine(context.getCommitIndex());
         }
@@ -361,11 +381,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
-
-        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
-        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
-        return this;
     }
 
     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
@@ -448,6 +463,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             replicate((Replicate) message);
         } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof CheckConsensusReached) {
+            possiblyUpdateCommitIndex();
         } else {
             return super.handleMessage(sender, message);
         }
index d4747da09d6df33eaf3290b0a74320e09f1fe836..7e8a7725910d599f7cd56ef40daaa06de2f8bece 100644 (file)
@@ -242,7 +242,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 log.debug("{}: Append entry to log {}", logName(), entry.getData());
 
-                context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
+                context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
 
                 if (entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
index 6c6133f173382a861716f5b18d67e46317682981..bf0eda6a5e6e38b671348e87f4b513e8979d84c1 100644 (file)
@@ -325,11 +325,9 @@ public class AbstractReplicatedLogImplTest {
         }
 
         @Override
-        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
-        }
-
-        @Override
-        public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+                boolean doAsync) {
+            return true;
         }
 
         @Override
index 95764ad202cfd8a3f3b875cd88edf418d3035dd6..d3af205abe175f7fc77c69e27efcdd11bb695fbd 100644 (file)
@@ -146,15 +146,10 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             return removeFrom(index) >= 0;
         }
 
-        @Override
-        public void appendAndPersist(
-            ReplicatedLogEntry replicatedLogEntry) {
-            append(replicatedLogEntry);
-        }
-
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
-        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+                boolean doAsync) {
             append(replicatedLogEntry);
 
             if (callback != null) {
@@ -164,6 +159,8 @@ public class MockRaftActorContext extends RaftActorContextImpl {
                     Throwables.propagate(e);
                 }
             }
+
+            return true;
         }
     }
 
@@ -226,81 +223,12 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         }
     }
 
-    public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
+    // TODO - this class can be removed and use ReplicatedLogImplEntry directly.
+    public static class MockReplicatedLogEntry extends ReplicatedLogImplEntry {
         private static final long serialVersionUID = 1L;
 
-        private final long term;
-        private final long index;
-        private final Payload data;
-
         public MockReplicatedLogEntry(long term, long index, Payload data) {
-
-            this.term = term;
-            this.index = index;
-            this.data = data;
-        }
-
-        @Override public Payload getData() {
-            return data;
-        }
-
-        @Override public long getTerm() {
-            return term;
-        }
-
-        @Override public long getIndex() {
-            return index;
-        }
-
-        @Override
-        public int size() {
-            return getData().size();
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + (data == null ? 0 : data.hashCode());
-            result = prime * result + (int) (index ^ index >>> 32);
-            result = prime * result + (int) (term ^ term >>> 32);
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            }
-            if (obj == null) {
-                return false;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
-            MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
-            if (data == null) {
-                if (other.data != null) {
-                    return false;
-                }
-            } else if (!data.equals(other.data)) {
-                return false;
-            }
-            if (index != other.index) {
-                return false;
-            }
-            if (term != other.term) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
-                    .append(", data=").append(data).append("]");
-            return builder.toString();
+            super(index, term, data);
         }
     }
 
index 7477f14168abf82353643ef90c6236e2b4bed895..da0493fe225137469a524e004204de257ff2aed3 100644 (file)
@@ -397,7 +397,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
-        verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
+        verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Procedure.class));
     }
 
     @Test
@@ -1283,4 +1283,49 @@ public class RaftActorTest extends AbstractActorTest {
 
         TEST_LOG.info("testLeaderTransitioning ending");
     }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Test
+    public void testReplicateWithPersistencePending() throws Exception {
+        final String leaderId = factory.generateActorId("leader-");
+        final String followerId = factory.generateActorId("follower-");
+
+        final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        DataPersistenceProvider mockPersistenceProvider = mock(DataPersistenceProvider.class);
+        doReturn(true).when(mockPersistenceProvider).isRecoveryApplicable();
+
+        TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+                MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config,
+                        mockPersistenceProvider), leaderId);
+        MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+        leaderActor.waitForInitializeBehaviorComplete();
+
+        leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+        Leader leader = new Leader(leaderActor.getRaftActorContext());
+        leaderActor.setCurrentBehavior(leader);
+
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+
+        ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
+        assertNotNull("ReplicatedLogEntry not found", logEntry);
+        assertEquals("isPersistencePending", true, logEntry.isPersistencePending());
+        assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+        leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, 0, 1, (short)0));
+        assertEquals("getCommitIndex", -1, leaderActor.getRaftActorContext().getCommitIndex());
+
+        ArgumentCaptor<Procedure> callbackCaptor = ArgumentCaptor.forClass(Procedure.class);
+        verify(mockPersistenceProvider).persistAsync(eq(logEntry), callbackCaptor.capture());
+
+        callbackCaptor.getValue().apply(logEntry);
+
+        assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
+        assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
+    }
 }
index 75c0d8980be9e414354deb21d4356c6bd193efce..6449f0afc841f7b0e8149c09fa549ad004eceac2 100644 (file)
@@ -62,13 +62,17 @@ public class ReplicatedLogImplTest {
     }
 
     private void verifyPersist(Object message) throws Exception {
-        verifyPersist(message, new Same(message));
+        verifyPersist(message, new Same(message), true);
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void verifyPersist(Object message, Matcher<?> matcher) throws Exception {
+    private void verifyPersist(Object message, Matcher<?> matcher, boolean async) throws Exception {
         ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
-        verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+        if (async) {
+            verify(mockPersistence).persistAsync(Matchers.argThat(matcher), procedure.capture());
+        } else {
+            verify(mockPersistence).persist(Matchers.argThat(matcher), procedure.capture());
+        }
 
         procedure.getValue().apply(message);
     }
@@ -80,7 +84,7 @@ public class ReplicatedLogImplTest {
 
         MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
 
-        log.appendAndPersist(logEntry1);
+        log.appendAndPersist(logEntry1, null, true);
 
         verifyPersist(logEntry1);
 
@@ -90,7 +94,7 @@ public class ReplicatedLogImplTest {
 
         MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
         Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
-        log.appendAndPersist(logEntry2, mockCallback);
+        log.appendAndPersist(logEntry2, mockCallback, true);
 
         verifyPersist(logEntry2);
 
@@ -107,7 +111,7 @@ public class ReplicatedLogImplTest {
         Procedure<ReplicatedLogEntry> mockCallback = Mockito.mock(Procedure.class);
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 1, new MockPayload("1"));
 
-        log.appendAndPersist(logEntry, mockCallback);
+        log.appendAndPersist(logEntry, mockCallback, true);
 
         verifyPersist(logEntry);
 
@@ -115,7 +119,7 @@ public class ReplicatedLogImplTest {
 
         reset(mockPersistence, mockCallback);
 
-        log.appendAndPersist(logEntry, mockCallback);
+        log.appendAndPersist(logEntry, mockCallback, true);
 
         verifyNoMoreInteractions(mockPersistence, mockCallback);
 
@@ -133,12 +137,12 @@ public class ReplicatedLogImplTest {
         final MockReplicatedLogEntry logEntry1 = new MockReplicatedLogEntry(1, 2, new MockPayload("2"));
         final MockReplicatedLogEntry logEntry2 = new MockReplicatedLogEntry(1, 3, new MockPayload("3"));
 
-        log.appendAndPersist(logEntry1);
+        log.appendAndPersist(logEntry1, null, true);
         verifyPersist(logEntry1);
 
         reset(mockPersistence);
 
-        log.appendAndPersist(logEntry2);
+        log.appendAndPersist(logEntry2, null, true);
         verifyPersist(logEntry2);
 
 
@@ -156,14 +160,14 @@ public class ReplicatedLogImplTest {
         int dataSize = 600;
         MockReplicatedLogEntry logEntry = new MockReplicatedLogEntry(1, 2, new MockPayload("2", dataSize));
 
-        log.appendAndPersist(logEntry);
+        log.appendAndPersist(logEntry, null, true);
         verifyPersist(logEntry);
 
         reset(mockPersistence);
 
         logEntry = new MockReplicatedLogEntry(1, 3, new MockPayload("3", 5));
 
-        log.appendAndPersist(logEntry);
+        log.appendAndPersist(logEntry, null, true);
         verifyPersist(logEntry);
 
         assertEquals("size", 2, log.size());
@@ -181,7 +185,7 @@ public class ReplicatedLogImplTest {
         log.removeFromAndPersist(1);
 
         DeleteEntries deleteEntries = new DeleteEntries(1);
-        verifyPersist(deleteEntries, match(deleteEntries));
+        verifyPersist(deleteEntries, match(deleteEntries), false);
 
         assertEquals("size", 1, log.size());
 
index 4211119e12e998a956e54feeecbaf323d96e2d8d..b81836e97a355dfcd1628271343aca815317c633 100644 (file)
@@ -297,6 +297,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Configure follower 2 to drop messages and lag.
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
         // Send 5 payloads - the second should cause a leader snapshot.
         final MockPayload payload2 = sendPayloadData(leaderActor, "two");
         final MockPayload payload3 = sendPayloadData(leaderActor, "three");
@@ -317,10 +321,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: "
                 + "sending 1 more payload to trigger second snapshot");
 
-        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
-        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
-                TimeUnit.MILLISECONDS);
-
         // Send another payload to trigger a second leader snapshot.
         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
 
@@ -402,6 +402,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
         // Send a payload with a large relative size but not enough to trigger a snapshot.
         MockPayload payload1 = sendPayloadData(leaderActor, "one", 500);
 
@@ -462,16 +466,12 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
 
         // Sends 3 payloads with indexes 4, 5 and 6.
-        verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
+        long leadersSnapshotIndexOnRecovery = verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot();
 
         // Recover the leader from persistence and verify.
         long leadersLastIndexOnRecovery = 6;
 
-        // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3.
-        long leadersSnapshotIndexOnRecovery = 3;
-
-        // The recovered journal should have 3 entries starting at index 4.
-        long leadersFirstJournalEntryIndexOnRecovery = 4;
+        long leadersFirstJournalEntryIndexOnRecovery = leadersSnapshotIndexOnRecovery + 1;
 
         verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery,
                 leadersFirstJournalEntryIndexOnRecovery);
@@ -528,7 +528,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
      */
     private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
             @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
-        testLog.info("testInstallSnapshotToLaggingFollower starting");
+        testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
 
@@ -614,15 +614,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
-        testLog.info("testInstallSnapshotToLaggingFollower complete");
+        testLog.info("verifyInstallSnapshotToLaggingFollower complete");
     }
 
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
      */
-    private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
-        testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}",
+    private long verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception {
+        testLog.info(
+                "verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot starting: replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
         // Send another payload - a snapshot should occur.
@@ -637,10 +638,21 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
+        // The last (fourth) payload may or may not have been applied when the snapshot is captured depending on the
+        // timing when the async persistence completes.
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
-        assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
-        verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
+        long leadersSnapshotIndex;
+        if (unAppliedEntry.isEmpty()) {
+            leadersSnapshotIndex = 4;
+            expSnapshotState.add(payload4);
+            verifySnapshot("Persisted", persistedSnapshot, currentTerm, 4, currentTerm, 4);
+        } else {
+            leadersSnapshotIndex = 3;
+            verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4);
+            assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
+            verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
+            expSnapshotState.add(payload4);
+        }
 
         // Send a couple more payloads.
         MockPayload payload5 = sendPayloadData(leaderActor, "five");
@@ -701,11 +713,12 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify follower 2's log state.
         verifyFollowersTrimmedLog(2, follower2Actor, 6);
 
-        expSnapshotState.add(payload4);
         expSnapshotState.add(payload5);
         expSnapshotState.add(payload6);
 
-        testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending");
+        testLog.info("verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot ending");
+
+        return leadersSnapshotIndex;
     }
 
     /**
@@ -713,7 +726,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
      */
     private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
             long firstJournalEntryIndex) {
-        testLog.info("testLeaderReinstatement starting");
+        testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
+            + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
         killActor(leaderActor);
 
@@ -741,7 +755,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState());
 
-        testLog.info("testLeaderReinstatement ending");
+        testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
     private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
index f2a216c821d43098ad3c43454a1111439c818756..8c6cf0945713ed0cbae3d5f7fbb2870220985dc5 100644 (file)
@@ -109,7 +109,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
         if (journalMap != null) {
             synchronized (journalMap) {
                 for (Map.Entry<Long, Object> e: journalMap.entrySet()) {
-                    builder.append("\n    ").append(e.getKey()).append(" = ").append(e.getValue());
+                    builder.append("\n    ").append(e.getKey()).append(" = ").append(deserialize(e.getValue()));
                 }
             }
         }