Merge "Bug-2301 - Clustering:Snapshots need not be stored in in-mem ReplicatedLog...
authorMoiz Raja <moraja@cisco.com>
Fri, 7 Nov 2014 17:03:06 +0000 (17:03 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 7 Nov 2014 17:03:06 +0000 (17:03 +0000)
15 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf

index 81b4670b71cb0eb728bed02d9c1da48163fc9b1f..2437bb4b7d381ff34a58138a14a7723ee7289a64 100644 (file)
@@ -27,7 +27,9 @@ import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
@@ -76,7 +78,15 @@ public class ExampleActor extends RaftActor {
 
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                String followers = "";
+                if (getRaftState() == RaftState.Leader) {
+                    followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+                } else {
+                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                }
+
+
             }
 
         } else {
index 6192cad2307b99896326619233bf9903f7aeb41a..2faae48838abfdb068504463ccb823b243a05891 100644 (file)
@@ -15,7 +15,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
     @Override
     public long getSnapshotBatchCount() {
-        return 50;
+        return 25;
     }
 
     @Override
index 2be4a0c36f91bb41be4d21f79f928ef1c0bfe71f..a2c9d660ad8d2472f7b9a83ab3f175b1883ad1f6 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import com.google.protobuf.ByteString;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -20,27 +18,23 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     // We define this as ArrayList so we can use ensureCapacity.
     protected ArrayList<ReplicatedLogEntry> journal;
-    protected ByteString snapshot;
+
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
     protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected ByteString previousSnapshot;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
+    public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
-        this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.journal = new ArrayList<>(unAppliedEntries);
     }
 
-
     public AbstractReplicatedLogImpl() {
-        this.snapshot = null;
         this.journal = new ArrayList<>();
     }
 
@@ -154,11 +148,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
-    @Override
-    public ByteString getSnapshot() {
-        return snapshot;
-    }
-
     @Override
     public long getSnapshotIndex() {
         return snapshotIndex;
@@ -185,18 +174,13 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         this.snapshotTerm = snapshotTerm;
     }
 
-    @Override
-    public void setSnapshot(ByteString snapshot) {
-        this.snapshot = snapshot;
-    }
-
     @Override
     public void clear(int startIndex, int endIndex) {
         journal.subList(startIndex, endIndex).clear();
     }
 
     @Override
-    public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
         snapshottedJournal = new ArrayList<>(journal.size());
 
         snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
@@ -207,9 +191,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
         previousSnapshotTerm = snapshotTerm;
         setSnapshotTerm(snapshotCapturedTerm);
-
-        previousSnapshot = getSnapshot();
-        setSnapshot(snapshot);
     }
 
     @Override
@@ -217,7 +198,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
-        previousSnapshot = null;
     }
 
     @Override
@@ -231,9 +211,5 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
         snapshotTerm = previousSnapshotTerm;
         previousSnapshotTerm = -1;
-
-        snapshot = previousSnapshot;
-        previousSnapshot = null;
-
     }
 }
index f3de9835385eb0880bb567402f5bb493f8ad3a76..2c4304d404a57ca919cb0c71f274588c6654f6e5 100644 (file)
@@ -61,5 +61,16 @@ public interface FollowerLogInformation {
      */
     public AtomicLong getMatchIndex();
 
+    /**
+     * Checks if the follower is active by comparing the last updated with the duration
+     * @return boolean
+     */
+    public boolean isFollowerActive();
+
+    /**
+     * restarts the timeout clock of the follower
+     */
+    public void markFollowerActive();
+
 
 }
index 94f9a53a850ae22537fe0607b5a16a1328b12532..c0cfd7e862e189b42036a246cc81f32e3d253e01 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.base.Stopwatch;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation{
@@ -18,11 +22,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{
 
     private final AtomicLong matchIndex;
 
+    private final Stopwatch stopwatch;
+
+    private final long followerTimeoutMillis;
+
     public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
-        AtomicLong matchIndex) {
+        AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
         this.id = id;
         this.nextIndex = nextIndex;
         this.matchIndex = matchIndex;
+        this.stopwatch = new Stopwatch();
+        this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
     }
 
     public long incrNextIndex(){
@@ -57,4 +67,17 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{
         return matchIndex;
     }
 
+    @Override
+    public boolean isFollowerActive() {
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+        return (stopwatch.isRunning()) && (elapsed <= followerTimeoutMillis);
+    }
+
+    @Override
+    public void markFollowerActive() {
+        if (stopwatch.isRunning()) {
+            stopwatch.reset();
+        }
+        stopwatch.start();
+    }
 }
index 2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6..f02b52beb9aa2c6aa14bbd5cc71dae725690f817 100644 (file)
@@ -18,6 +18,7 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
@@ -30,6 +31,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
@@ -388,6 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     public java.util.Set<String> getPeers() {
+
         return context.getPeerAddresses().keySet();
     }
 
@@ -636,7 +640,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
 
-        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+        context.getReplicatedLog().snapshotPreCommit(
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
@@ -644,16 +648,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             "and term:{}", captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
+        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
+            // this would be call straight to the leader and won't initiate in serialization
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+        }
+
         captureSnapshot = null;
         hasSnapshotCaptureInitiated = false;
     }
 
-
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(ByteString.copyFrom(snapshot.getState()),
-                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
 
@@ -843,4 +850,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
+        currentBehavior = behavior;
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior;
+    }
 }
index 85893333c2892eadf43e9aacc374f574c1b4a03a..7ee85322a60f9ec93a4effc6e9379bd52d77a6ed 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import com.google.protobuf.ByteString;
-
 import java.util.List;
 
 /**
@@ -122,13 +120,6 @@ public interface ReplicatedLog {
      */
     boolean isInSnapshot(long index);
 
-    /**
-     * Get the snapshot
-     *
-     * @return an object representing the snapshot if it exists. null otherwise
-     */
-    ByteString getSnapshot();
-
     /**
      * Get the index of the snapshot
      *
@@ -156,12 +147,6 @@ public interface ReplicatedLog {
      */
     public void setSnapshotTerm(long snapshotTerm);
 
-    /**
-     * sets the snapshot in bytes
-     * @param snapshot
-     */
-    public void setSnapshot(ByteString snapshot);
-
     /**
      * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
      * @param startIndex
@@ -172,12 +157,10 @@ public interface ReplicatedLog {
     /**
      * Handles all the bookkeeping in order to perform a rollback in the
      * event of SaveSnapshotFailure
-     * @param snapshot
      * @param snapshotCapturedIndex
      * @param snapshotCapturedTerm
      */
-    public void snapshotPreCommit(ByteString snapshot,
-        long snapshotCapturedIndex, long snapshotCapturedTerm);
+    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
 
     /**
      * Sets the Replicated log to state after snapshot success.
index bb86e1a37d9e1b1bb481dbcf78c48792d9ce312b..d4dd3350f30b120bf965c885e3319152db9a2c38 100644 (file)
@@ -13,13 +13,20 @@ public class CaptureSnapshot {
     private long lastAppliedTerm;
     private long lastIndex;
     private long lastTerm;
+    private boolean installSnapshotInitiated;
 
     public CaptureSnapshot(long lastIndex, long lastTerm,
         long lastAppliedIndex, long lastAppliedTerm) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+    }
+
+    public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
+        long lastAppliedTerm, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
+        this.installSnapshotInitiated = installSnapshotInitiated;
     }
 
     public long getLastAppliedIndex() {
@@ -37,4 +44,8 @@ public class CaptureSnapshot {
     public long getLastTerm() {
         return lastTerm;
     }
+
+    public boolean isInstallSnapshotInitiated() {
+        return installSnapshotInitiated;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/InitiateInstallSnapshot.java
new file mode 100644 (file)
index 0000000..7844914
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Internal message by Leader to initiate an install snapshot
+ */
+public class InitiateInstallSnapshot {
+}
+
index 6c3313f316623c10fcdf7468e1181d5941044201..83c85d9135a773541b40889ecf81453d9607833b 100644 (file)
@@ -8,7 +8,16 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
-import java.io.Serializable;
+import com.google.protobuf.ByteString;
 
-public class SendInstallSnapshot implements Serializable {
+public class SendInstallSnapshot {
+    private ByteString snapshot;
+
+    public SendInstallSnapshot(ByteString snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    public ByteString getSnapshot() {
+        return snapshot;
+    }
 }
index de748675a7aae0ce4ab617cc8d0d698725dd3e6c..ef104e7f58527a4be47bda4351e69f00e320786c 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -20,6 +22,8 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -66,8 +70,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Leader extends AbstractRaftActorBehavior {
 
 
-    protected final Map<String, FollowerLogInformation> followerToLog =
-        new HashMap();
+    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private final Set<String> followers;
@@ -79,6 +82,8 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final int minReplicationCount;
 
+    private Optional<ByteString> snapshot;
+
     public Leader(RaftActorContext context) {
         super(context);
 
@@ -88,7 +93,8 @@ public class Leader extends AbstractRaftActorBehavior {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
                     new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1));
+                    new AtomicLong(-1),
+                    context.getConfigParams().getElectionTimeOutInterval());
 
             followerToLog.put(followerId, followerLogInformation);
         }
@@ -103,6 +109,7 @@ public class Leader extends AbstractRaftActorBehavior {
             minReplicationCount = 0;
         }
 
+        snapshot = Optional.absent();
 
         // Immediately schedule a heartbeat
         // Upon election: send initial empty AppendEntries RPCs
@@ -117,6 +124,15 @@ public class Leader extends AbstractRaftActorBehavior {
 
     }
 
+    private Optional<ByteString> getSnapshot() {
+        return snapshot;
+    }
+
+    @VisibleForTesting
+    void setSnapshot(Optional<ByteString> snapshot) {
+        this.snapshot = snapshot;
+    }
+
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
@@ -146,6 +162,8 @@ public class Leader extends AbstractRaftActorBehavior {
             return this;
         }
 
+        followerLogInformation.markFollowerActive();
+
         if (appendEntriesReply.isSuccess()) {
             followerLogInformation
                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
@@ -246,10 +264,18 @@ public class Leader extends AbstractRaftActorBehavior {
             if (message instanceof SendHeartBeat) {
                 sendHeartBeat();
                 return this;
-            } else if(message instanceof SendInstallSnapshot) {
+
+            } else if(message instanceof InitiateInstallSnapshot) {
                 installSnapshotIfNeeded();
+
+            } else if(message instanceof SendInstallSnapshot) {
+                // received from RaftActor
+                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+                sendInstallSnapshot();
+
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
+
             } else if (message instanceof InstallSnapshotReply){
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
@@ -263,8 +289,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot =
-            mapFollowerToSnapshot.get(followerId);
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+        followerLogInformation.markFollowerActive();
 
         if (followerToSnapshot != null &&
             followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
@@ -280,8 +307,6 @@ public class Leader extends AbstractRaftActorBehavior {
                         );
                     }
 
-                    FollowerLogInformation followerLogInformation =
-                        followerToLog.get(followerId);
                     followerLogInformation.setMatchIndex(
                         context.getReplicatedLog().getSnapshotIndex());
                     followerLogInformation.setNextIndex(
@@ -293,6 +318,12 @@ public class Leader extends AbstractRaftActorBehavior {
                             followerToLog.get(followerId).getNextIndex().get());
                     }
 
+                    if (mapFollowerToSnapshot.isEmpty()) {
+                        // once there are no pending followers receiving snapshots
+                        // we can remove snapshot from the memory
+                        setSnapshot(Optional.<ByteString>absent());
+                    }
+
                 } else {
                     followerToSnapshot.markSendStatus(true);
                 }
@@ -344,64 +375,87 @@ public class Leader extends AbstractRaftActorBehavior {
             if (followerActor != null) {
                 FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
                 long followerNextIndex = followerLogInformation.getNextIndex().get();
-                List<ReplicatedLogEntry> entries = Collections.emptyList();
+                boolean isFollowerActive = followerLogInformation.isFollowerActive();
+                List<ReplicatedLogEntry> entries = null;
 
                 if (mapFollowerToSnapshot.get(followerId) != null) {
-                    if (mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    // if install snapshot is in process , then sent next chunk if possible
+                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerId);
+                    } else {
+                        // we send a heartbeat even if we have not received a reply for the last chunk
+                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                            Collections.<ReplicatedLogEntry>emptyList());
                     }
 
                 } else {
+                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 
-                    if (context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    if (isFollowerActive &&
+                        context.getReplicatedLog().isPresent(followerNextIndex)) {
                         // FIXME : Sending one entry at a time
                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
-                        followerActor.tell(
-                            new AppendEntries(currentTerm(), context.getId(),
-                                prevLogIndex(followerNextIndex),
-                                prevLogTerm(followerNextIndex), entries,
-                                context.getCommitIndex()).toSerializable(),
-                            actor()
-                        );
-
-                    } else {
-                        // if the followers next index is not present in the leaders log, then snapshot should be sent
-                        long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                        long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                        if (followerNextIndex >= 0 && leaderLastIndex >= followerNextIndex ) {
-                            // if the follower is just not starting and leader's index
-                            // is more than followers index
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("SendInstallSnapshot to follower:{}," +
-                                        "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                        "leader-last-index:{}", followerId,
-                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                                );
-                            }
-
-                            actor().tell(new SendInstallSnapshot(), actor());
-                        } else {
-                            followerActor.tell(
-                                new AppendEntries(currentTerm(), context.getId(),
-                                    prevLogIndex(followerNextIndex),
-                                    prevLogTerm(followerNextIndex), entries,
-                                    context.getCommitIndex()).toSerializable(),
-                                actor()
+                    } else if (isFollowerActive && followerNextIndex >= 0 &&
+                        leaderLastIndex >= followerNextIndex ) {
+                        // if the followers next index is not present in the leaders log, and
+                        // if the follower is just not starting and if leader's index is more than followers index
+                        // then snapshot should be sent
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                    "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
                             );
                         }
+                        actor().tell(new InitiateInstallSnapshot(), actor());
+
+                        // we would want to sent AE as the capture snapshot might take time
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+
+                    } else {
+                        //we send an AppendEntries, even if the follower is inactive
+                        // in-order to update the followers timestamp, in case it becomes active again
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
                     }
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
                 }
             }
         }
     }
 
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+        List<ReplicatedLogEntry> entries) {
+        followerActor.tell(
+            new AppendEntries(currentTerm(), context.getId(),
+                prevLogIndex(followerNextIndex),
+                prevLogTerm(followerNextIndex), entries,
+                context.getCommitIndex()).toSerializable(),
+            actor()
+        );
+    }
+
     /**
      * An installSnapshot is scheduled at a interval that is a multiple of
      * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
      * snapshots at every heartbeat.
+     *
+     * Install Snapshot works as follows
+     * 1. Leader sends a InitiateInstallSnapshot message to self
+     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+     * 5. On complete, Follower sends back a InstallSnapshotReply.
+     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+     * and replenishes the memory by deleting the snapshot in Replicated log.
+     *
      */
-    private void installSnapshotIfNeeded(){
+    private void installSnapshotIfNeeded() {
         for (String followerId : followers) {
             ActorSelection followerActor =
                 context.getPeerActorSelection(followerId);
@@ -412,6 +466,58 @@ public class Leader extends AbstractRaftActorBehavior {
 
                 long nextIndex = followerLogInformation.getNextIndex().get();
 
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    LOG.info("{} follower needs a snapshot install", followerId);
+                    if (snapshot.isPresent()) {
+                        // if a snapshot is present in the memory, most likely another install is in progress
+                        // no need to capture snapshot
+                        sendSnapshotChunk(followerActor, followerId);
+
+                    } else {
+                        initiateCaptureSnapshot();
+                        //we just need 1 follower who would need snapshot to be installed.
+                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+                        // who needs an install and send to all who need
+                        break;
+                    }
+
+                }
+            }
+        }
+    }
+
+    // on every install snapshot, we try to capture the snapshot.
+    // Once a capture is going on, another one issued will get ignored by RaftActor.
+    private void initiateCaptureSnapshot() {
+        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+        long lastAppliedIndex = -1;
+        long lastAppliedTerm = -1;
+
+        if (lastAppliedEntry != null) {
+            lastAppliedIndex = lastAppliedEntry.getIndex();
+            lastAppliedTerm = lastAppliedEntry.getTerm();
+        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        boolean isInstallSnapshotInitiated = true;
+        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+            actor());
+    }
+
+
+    private void sendInstallSnapshot() {
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
                     context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     sendSnapshotChunk(followerActor, followerId);
@@ -426,20 +532,21 @@ public class Leader extends AbstractRaftActorBehavior {
      */
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    context.getReplicatedLog().getSnapshotIndex(),
-                    context.getReplicatedLog().getSnapshotTerm(),
-                    getNextSnapshotChunk(followerId,
-                        context.getReplicatedLog().getSnapshot()),
-                    mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks()
-                ).toSerializable(),
-                actor()
-            );
-            LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            if (snapshot.isPresent()) {
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        getNextSnapshotChunk(followerId,snapshot.get()),
+                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                    ).toSerializable(),
+                    actor()
+                );
+                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            }
         } catch (IOException e) {
             LOG.error(e, "InstallSnapshot failed for Leader.");
         }
@@ -456,10 +563,9 @@ public class Leader extends AbstractRaftActorBehavior {
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
         }
-
         return nextChunk;
     }
 
@@ -495,14 +601,11 @@ public class Leader extends AbstractRaftActorBehavior {
         // Scheduling the heartbeat only once here because heartbeats do not
         // need to be sent if there are other messages being sent to the remote
         // actor.
-        heartbeatSchedule =
-            context.getActorSystem().scheduler().scheduleOnce(
-                interval,
-                context.getActor(), new SendHeartBeat(),
-                context.getActorSystem().dispatcher(), context.getActor());
+        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+            interval, context.getActor(), new SendHeartBeat(),
+            context.getActorSystem().dispatcher(), context.getActor());
     }
 
-
     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
         if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
@@ -517,7 +620,7 @@ public class Leader extends AbstractRaftActorBehavior {
         installSnapshotSchedule =
             context.getActorSystem().scheduler().scheduleOnce(
                 interval,
-                context.getActor(), new SendInstallSnapshot(),
+                context.getActor(), new InitiateInstallSnapshot(),
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
@@ -628,4 +731,19 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    // called from example-actor for printing the follower-states
+    public String printFollowerStates() {
+        StringBuilder sb = new StringBuilder();
+        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+        }
+        return "[" + sb.toString() + "]";
+    }
+
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        followerToLog.get(followerId).markFollowerActive();
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
new file mode 100644 (file)
index 0000000..7df9f37
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FollowerLogInformationImplTest {
+
+    @Test
+    public void testIsFollowerActive() {
+
+        FiniteDuration timeoutDuration =
+            new FiniteDuration(500, TimeUnit.MILLISECONDS);
+
+        FollowerLogInformation followerLogInformation =
+            new FollowerLogInformationImpl(
+                "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+
+
+
+        assertFalse("Follower should be termed inactive before stopwatch starts",
+            followerLogInformation.isFollowerActive());
+
+        followerLogInformation.markFollowerActive();
+        if (sleepWithElaspsedTimeReturned(200) > 200) {
+          return;
+        }
+        assertTrue("Follower should be active", followerLogInformation.isFollowerActive());
+
+        if (sleepWithElaspsedTimeReturned(400) > 400) {
+            return;
+        }
+        assertFalse("Follower should be inactive after time lapsed",
+            followerLogInformation.isFollowerActive());
+
+        followerLogInformation.markFollowerActive();
+        assertTrue("Follower should be active from inactive",
+            followerLogInformation.isFollowerActive());
+    }
+
+    // we cannot rely comfortably that the sleep will indeed sleep for the desired time
+    // hence getting the actual elapsed time and do a match.
+    // if the sleep has spilled over, then return the test gracefully
+    private long sleepWithElaspsedTimeReturned(long millis) {
+        Stopwatch stopwatch = new Stopwatch();
+        stopwatch.start();
+        Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
+        stopwatch.stop();
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
+}
index ca864eb426ca48925b4abc27d0fed75e9bde7ebc..87e40f236cf5c6dfee6e03ea711d61d342d9b345 100644 (file)
@@ -31,6 +31,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -557,7 +559,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
@@ -600,7 +601,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
 
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
             }
@@ -646,8 +646,9 @@ public class RaftActorTest extends AbstractActorTest {
 
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
-                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
-                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                    MockRaftActor.props(persistenceId,Collections.EMPTY_MAP,
+                        Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -659,6 +660,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
 
+                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+
+                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
+
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
 
                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
@@ -698,6 +703,9 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
+                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+                mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
+
                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
 
                 verify(mockRaftActor.delegate).createSnapshot();
@@ -710,8 +718,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(dataPersistenceProvider).deleteMessages(100);
 
-                assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot());
-
                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
 
                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
@@ -755,8 +761,6 @@ public class RaftActorTest extends AbstractActorTest {
 
             }
         };
-
-
     }
 
     @Test
@@ -780,13 +784,15 @@ public class RaftActorTest extends AbstractActorTest {
 
                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
-                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class)));
+                oldReplicatedLog.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
+                        mock(Payload.class)));
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+                    new MockRaftActorContext.MockPayload("A"),
+                    new MockRaftActorContext.MockPayload("B"),
+                    new MockRaftActorContext.MockPayload("C"),
+                    new MockRaftActorContext.MockPayload("D")));
 
                 Snapshot snapshot = mock(Snapshot.class);
 
@@ -798,9 +804,11 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
 
-                assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog());
+                assertTrue("The replicatedLog should have changed",
+                    oldReplicatedLog != mockRaftActor.getReplicatedLog());
 
-                assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied());
+                assertEquals("lastApplied should be same as in the snapshot",
+                    (Long) 3L, mockRaftActor.getLastApplied());
 
                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
 
@@ -833,6 +841,10 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
+                RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
+
+                mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
+
                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
@@ -843,8 +855,6 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
 
-                assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot());
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
             }
index 030b4b23763a15807f1e3ec143ea1f39bcc9c731..705c69607c658d16d03086b573230923e2ba05b4 100644 (file)
@@ -1,15 +1,14 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -17,6 +16,8 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -29,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -36,7 +38,7 @@ import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -72,8 +74,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap();
 
@@ -155,10 +156,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         }.get(); // this extracts the received message
 
                     assertEquals("match", out);
-
                 }
-
-
             };
         }};
     }
@@ -214,229 +212,360 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testSendInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext(getRef());
-                    actorContext.setPeerAddresses(peerAddresses);
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
+            //set follower timeout to 2 mins, helps during debugging
+            actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            MockLeader leader = new MockLeader(actorContext);
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
 
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
 
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
+            //send first chunk and no InstallSnapshotReply received yet
+            leader.getFollowerToSnapshot().getNextChunk();
+            leader.getFollowerToSnapshot().incrementChunkIndex();
 
-                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-                    RaftActorBehavior raftBehavior = leader.handleMessage(
-                        senderActor, new Replicate(null, "state-id", entry));
+            leader.handleMessage(leaderActor, new SendHeartBeat());
 
-                    assertTrue(raftBehavior instanceof Leader);
+            AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
+                followerActor, AppendEntries.SERIALIZABLE_CLASS);
 
-                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
-                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
-                        @Override
-                        protected Boolean match(Object o) throws Exception {
-                            if (o instanceof SendInstallSnapshot) {
-                                return true;
-                            }
-                            return false;
-                        }
-                    }.get();
+            assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
+                "received", aeproto);
 
-                    boolean sendInstallSnapshotReceived = false;
-                    for (Boolean b: matches) {
-                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
-                    }
+            AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+
+            assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+
+            //InstallSnapshotReply received
+            leader.getFollowerToSnapshot().markSendStatus(true);
+
+            leader.handleMessage(senderActor, new SendHeartBeat());
+
+            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
+                MessageCollectorActor.getFirstMatching(followerActor,
+                    InstallSnapshot.SERIALIZABLE_CLASS);
+
+            assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
+                isproto);
+
+            InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
 
-                    assertTrue(sendInstallSnapshotReceived);
+            assertEquals(snapshotIndex, is.getLastIncludedIndex());
 
+        }};
+    }
+
+    @Test
+    public void testSendAppendEntriesSnapshotScenario() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(getRef());
+            actorContext.setPeerAddresses(peerAddresses);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
+
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                senderActor, new Replicate(null, "state-id", entry));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+            Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                @Override
+                protected Boolean match(Object o) throws Exception {
+                    if (o instanceof InitiateInstallSnapshot) {
+                        return true;
+                    }
+                    return false;
                 }
-            };
+            }.get();
+
+            boolean initiateInitiateInstallSnapshot = false;
+            for (Boolean b: matches) {
+                initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+            }
+
+            assertTrue(initiateInitiateInstallSnapshot);
         }};
     }
 
     @Test
-    public void testInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testInitiateInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setLastApplied(3);
+            actorContext.setCommitIndex(followersLastIndex);
 
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+            Leader leader = new Leader(actorContext);
+            // set the snapshot as absent and check if capture-snapshot is invoked.
+            leader.setSnapshot(Optional.<ByteString>absent());
 
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
+            actorContext.getReplicatedLog().append(entry);
 
-                    assertTrue(raftBehavior instanceof Leader);
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                leaderActor, new InitiateInstallSnapshot());
 
-                    // check if installsnapshot gets called with the correct values.
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            protected String match(Object in) {
-                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
-                                    InstallSnapshot is = (InstallSnapshot)
-                                        SerializationUtils.fromSerializable(in);
-                                    if (is.getData() == null) {
-                                        return "InstallSnapshot data is null";
-                                    }
-                                    if (is.getLastIncludedIndex() != snapshotIndex) {
-                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
-                                    }
-                                    if (is.getLastIncludedTerm() != snapshotTerm) {
-                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
-                                    }
-                                    if (is.getTerm() == currentTerm) {
-                                        return is.getTerm() + "!=" + currentTerm;
-                                    }
+            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+                getFirstMatching(leaderActor, CaptureSnapshot.class);
 
-                                    return "match";
+            assertNotNull(cs);
 
-                               } else {
-                                    return "message mismatch:" + in.getClass();
-                                }
+            assertTrue(cs.isInstallSnapshotInitiated());
+            assertEquals(3, cs.getLastAppliedIndex());
+            assertEquals(1, cs.getLastAppliedTerm());
+            assertEquals(4, cs.getLastIndex());
+            assertEquals(2, cs.getLastTerm());
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            actorContext.setCommitIndex(followersLastIndex);
+
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // check if installsnapshot gets called with the correct values.
+            final String out =
+                new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    // do not put code outside this method, will run afterwards
+                    protected String match(Object in) {
+                        if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                            InstallSnapshot is = (InstallSnapshot)
+                                SerializationUtils.fromSerializable(in);
+                            if (is.getData() == null) {
+                                return "InstallSnapshot data is null";
+                            }
+                            if (is.getLastIncludedIndex() != snapshotIndex) {
+                                return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                            }
+                            if (is.getLastIncludedTerm() != snapshotTerm) {
+                                return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                            }
+                            if (is.getTerm() == currentTerm) {
+                                return is.getTerm() + "!=" + currentTerm;
                             }
-                        }.get(); // this extracts the received message
 
-                    assertEquals("match", out);
-                }
-            };
+                            return "match";
+
+                        } else {
+                            return "message mismatch:" + in.getClass();
+                        }
+                    }
+                }.get(); // this extracts the received message
+
+            assertEquals("match", out);
         }};
     }
 
     @Test
     public void testHandleInstallSnapshotReplyLastChunk() {
-        new LeaderTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+        new JavaTestKit(getSystem()) {{
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
+            Map<String, String> peerAddresses = new HashMap();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
-
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
-
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
-
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
-                    ByteString bs = toByteString(leadersSnapshot);
-                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
-                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
-                        leader.getFollowerToSnapshot().getNextChunk();
-                        leader.getFollowerToSnapshot().incrementChunkIndex();
-                    }
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
 
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
-                            leader.getFollowerToSnapshot().getChunkIndex(), true));
+            MockLeader leader = new MockLeader(actorContext);
 
-                    assertTrue(raftBehavior instanceof Leader);
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-                    assertEquals(leader.followerToLog.size(), 1);
-                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
-                }
-            };
+            // set the snapshot variables in replicatedlog
+
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+            while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                leader.getFollowerToSnapshot().getNextChunk();
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                    leader.getFollowerToSnapshot().getChunkIndex(), true));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+            assertEquals(leader.followerToLog.size(), 1);
+            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex().get());
+            assertEquals(snapshotIndex, fli.getMatchIndex().get());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
         }};
     }
 
@@ -584,6 +713,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             followerActorContext.setCommitIndex(1);
 
             Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
@@ -652,6 +782,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             followerActorContext.setCommitIndex(2);
 
             Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
@@ -816,30 +947,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     }
 
-    private static class LeaderTestKit extends JavaTestKit {
-
-        private LeaderTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
-        }
-
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-            new JavaTestKit.EventFilter<Boolean>(logLevel
-            ) {
-                @Override
-                protected Boolean run() {
-                    return true;
-                }
-            }.from(subject.path().toString())
-                .message(logMessage)
-                .occurrences(1).exec();
-
-            Assert.assertEquals(true, result);
-
-        }
-    }
-
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
@@ -848,14 +955,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             super(context);
         }
 
-        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(nextIndex),
-                    new AtomicLong(matchIndex));
-            followerToLog.put(followerId, followerLogInformation);
-        }
-
         public FollowerToSnapshot getFollowerToSnapshot() {
             return fts;
         }
@@ -866,4 +965,26 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         }
     }
+
+    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+        private long electionTimeOutIntervalMillis;
+        private int snapshotChunkSize;
+
+        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+            super();
+            this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+            this.snapshotChunkSize = snapshotChunkSize;
+        }
+
+        @Override
+        public FiniteDuration getElectionTimeOutInterval() {
+            return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public int getSnapshotChunkSize() {
+            return snapshotChunkSize;
+        }
+    }
 }
index 8a45108f8b675b2442e2dd1ecb9195472095bda5..818ddf7d8572889e924a364c1203ca4892eb6029 100644 (file)
@@ -7,7 +7,7 @@ akka {
 
     actor {
         # enable to test serialization only.
-        serialize-messages = on
+        serialize-messages = off
 
         serializers {
           java  = "akka.serialization.JavaSerializer"