Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in-mem log 31/15131/12
authorKamal Rameshan <kramesha@cisco.com>
Wed, 11 Feb 2015 07:24:55 +0000 (23:24 -0800)
committerKamal Rameshan <kramesha@cisco.com>
Tue, 17 Feb 2015 20:17:01 +0000 (12:17 -0800)
Similar to fake snapshots, real snapshots should also be using the replicatedToAllIndex.

replicatedToAllIndex is part of the RaftActorBehavior.

A performSnapshotWithoutCapture helper method is part of the AbstractRaftActorBehavior.

On CaptureSnapshotReply, we clear the log based on replicatedToAllIndex and the corresponding term.

Many of the existing tests seem to test out this changes and some were added and enhanced.

Bug-2715: Fix in-mem log cleanup for an Inactive follower

In case if one of the follower is down, rely on the memory usage to clear the log.

This is done in CaptureSnapshotReply

Change-Id: Ifb3ca19691f20735e3f84b968a7adf01398c20e0
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index d1c3fefee8309208a6df6fe9b539a10ee000ddef..e2aa16918e8ed0354f8543f8edd74b89474e4072 100644 (file)
@@ -41,7 +41,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     protected int adjustedIndex(long logEntryIndex) {
-        if(snapshotIndex < 0){
+        if (snapshotIndex < 0) {
             return (int) logEntryIndex;
         }
         return (int) (logEntryIndex - (snapshotIndex + 1));
@@ -134,6 +134,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
        return journal.size();
     }
 
+    @Override
+    public int dataSize() {
+        return dataSize;
+    }
+
     @Override
     public boolean isPresent(long logEntryIndex) {
         if (logEntryIndex > lastIndex()) {
@@ -200,6 +205,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
         dataSize = 0;
+        // need to recalc the datasize based on the entries left after precommit.
+        for(ReplicatedLogEntry logEntry : journal) {
+            dataSize += logEntry.size();
+        }
+
     }
 
     @Override
index 1f446c72f1ee9eccef904866236c1ec5ea235fb9..353d0b4a241f844f1338435e712202211263e3d7 100644 (file)
@@ -573,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
@@ -668,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-        //be greedy and remove entries from in-mem journal which are in the snapshot
-        // and update snapshotIndex and snapshotTerm without waiting for the success,
+        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+        if (context.getReplicatedLog().dataSize() > dataThreshold) {
+            // if memory is less, clear the log based on lastApplied.
+            // this could/should only happen if one of the followers is down
+            // as normally we keep removing from the log when its replicated to all.
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
 
-        context.getReplicatedLog().snapshotPreCommit(
-            captureSnapshot.getLastAppliedIndex(),
-            captureSnapshot.getLastAppliedTerm());
+        } else {
+            // clear the log based on replicatedToAllIndex
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                    captureSnapshot.getReplicatedToAllTerm());
+        }
+        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
 
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
@@ -717,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
 
-                @Override public void apply(DeleteEntries param)
-                    throws Exception {
+                @Override
+                public void apply(DeleteEntries param)
+                        throws Exception {
                     //FIXME : Doing nothing for now
                     dataSize = 0;
-                    for(ReplicatedLogEntry entry : journal){
+                    for (ReplicatedLogEntry entry : journal) {
                         dataSize += entry.size();
                     }
                 }
@@ -735,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(replicatedLogEntry, null);
         }
 
-        @Override
-        public int dataSize() {
-            return dataSize;
-        }
-
         public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -766,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex()+1;
+                        long journalSize = lastIndex() + 1;
 
                         if(!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
@@ -817,12 +822,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
-                            getSelf().tell(new CaptureSnapshot(
-                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
                                 null);
                             context.setSnapshotCaptureInitiated(true);
                         }
-                        if(callback != null){
+                        if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
                     }
index 80b7ad90d05dbb1e9fe45ec57a65a44dedfe4463..82d0839bee772bd8efba88a8d1392ab1d336ff1c 100644 (file)
@@ -145,14 +145,14 @@ public interface ReplicatedLog {
      * sets snapshot term
      * @param snapshotTerm
      */
-    public void setSnapshotTerm(long snapshotTerm);
+    void setSnapshotTerm(long snapshotTerm);
 
     /**
      * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
      * @param startIndex
      * @param endIndex
      */
-    public void clear(int startIndex, int endIndex);
+    void clear(int startIndex, int endIndex);
 
     /**
      * Handles all the bookkeeping in order to perform a rollback in the
@@ -160,20 +160,21 @@ public interface ReplicatedLog {
      * @param snapshotCapturedIndex
      * @param snapshotCapturedTerm
      */
-    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
+    void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
 
     /**
      * Sets the Replicated log to state after snapshot success.
      */
-    public void snapshotCommit();
+    void snapshotCommit();
 
     /**
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
-    public void snapshotRollback();
+    void snapshotRollback();
 
     /**
      * Size of the data in the log (in bytes)
      */
-    public int dataSize();
+    int dataSize();
+
 }
index d4dd3350f30b120bf965c885e3319152db9a2c38..a96b1e435cf88df201be0c344908143b29166334 100644 (file)
@@ -14,19 +14,23 @@ public class CaptureSnapshot {
     private long lastIndex;
     private long lastTerm;
     private boolean installSnapshotInitiated;
+    private long replicatedToAllIndex;
+    private long replicatedToAllTerm;
 
     public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
     }
 
     public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, boolean installSnapshotInitiated) {
+        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
         this.installSnapshotInitiated = installSnapshotInitiated;
+        this.replicatedToAllIndex = replicatedToAllIndex;
+        this.replicatedToAllTerm = replicatedToAllTerm;
     }
 
     public long getLastAppliedIndex() {
@@ -48,4 +52,12 @@ public class CaptureSnapshot {
     public boolean isInstallSnapshotInitiated() {
         return installSnapshotInitiated;
     }
+
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
+    public long getReplicatedToAllTerm() {
+        return replicatedToAllTerm;
+    }
 }
index 9e4f3b46c45b5de54458770b910b63f6c70305a8..94c38f6108eabf62ef9e41a3f484a6e2f915129d 100644 (file)
@@ -91,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
-    private long replicatedToAllIndex = -1;
-
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -236,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void purgeInMemoryLog() {
-        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        //find the lowest index across followers which has been replicated to all.
+        // lastApplied if there are no followers, so that we keep clearing the log for single-node
         // we would delete the in-mem log from that index on, in-order to minimize mem usage
         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
-        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
         for (FollowerLogInformation info : followerToLog.values()) {
             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
         }
 
-        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+        super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
     @Override
@@ -472,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex >= followerNextIndex) {
+                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -507,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), replicatedToAllIndex);
+            context.getCommitIndex(), super.getReplicatedToAllIndex());
 
         if(!entries.isEmpty()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
@@ -518,7 +517,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
-     * /**
      * Install Snapshot works as follows
      * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
      * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
@@ -533,10 +531,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * @param followerNextIndex
      */
     private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
-        }
-
         if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
                 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
 
@@ -557,15 +551,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if (lastAppliedEntry != null) {
                     lastAppliedIndex = lastAppliedEntry.getIndex();
                     lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
                     lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
                     lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
                 }
 
                 boolean isInstallSnapshotInitiated = true;
-                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-                        actor());
+                long replicatedToAllIndex = super.getReplicatedToAllIndex();
+                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+                    isInstallSnapshotInitiated), actor());
                 context.setSnapshotCaptureInitiated(true);
             }
         }
@@ -605,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         nextSnapshotChunk,
-                            followerToSnapshot.incrementChunkIndex(),
-                            followerToSnapshot.getTotalChunks(),
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
                         Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
index 075b2873e45332364c09aee83c49e6b23e40780c..8f433d529a8c12e81079a8463dd968e377dc330d 100644 (file)
@@ -58,12 +58,23 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected String leaderId = null;
 
+    private long replicatedToAllIndex = -1;
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
         this.LOG = context.getLogger();
     }
 
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        this.replicatedToAllIndex = replicatedToAllIndex;
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
     /**
      * Derived classes should not directly handle AppendEntries messages it
      * should let the base class handle it first. Once the base class handles
@@ -423,17 +434,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     }
 
-    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
 
+    /**
+     * Performs a snapshot with no capture on the replicated log.
+     * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+     *
+     * @param snapshotCapturedIndex
+     */
+    protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
         //  we would want to keep the lastApplied as its used while capturing snapshots
-        long tempMin = Math.min(minReplicatedToAllIndex,
-                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+        long lastApplied = context.getLastApplied();
+        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
         if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
-            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            //use the term of the temp-min, since we check for isPresent, entry will not be null
+            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
             context.getReplicatedLog().snapshotCommit();
-            return tempMin;
+            setReplicatedToAllIndex(tempMin);
         }
-        return currentReplicatedIndex;
     }
+
 }
index 8a0788702d81f3dfe04d713bcc99e527273c82d2..675543f2d1449514088b4b4868bd2b77a51a36bc 100644 (file)
@@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior {
             lastIndex(), lastTerm()), actor());
 
         if (!context.isSnapshotCaptureInitiated()) {
-            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+            super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
         return this;
index 064cd8b88c8b68152f04cdc25378c792eb263a7b..b766e0ce39fe9be1b847f54c625c802d036a7a0a 100644 (file)
@@ -50,4 +50,16 @@ public interface RaftActorBehavior extends AutoCloseable{
      * @return
      */
     String getLeaderId();
+
+    /**
+     * setting the index of the log entry which is replicated to all nodes
+     * @param replicatedToAllIndex
+     */
+    void setReplicatedToAllIndex(long replicatedToAllIndex);
+
+    /**
+     * getting the index of the log entry which is replicated to all nodes
+     * @return
+     */
+    long getReplicatedToAllIndex();
 }
index ffd8edfbe15fa3aad7a9f237a053b23017946337..885c3ab1094eb68d677fad1728fcbf6046e8f127 100644 (file)
@@ -20,6 +20,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+
 /**
 *
 */
@@ -130,11 +131,17 @@ public class AbstractReplicatedLogImplTest {
 
     @Test
     public void testSnapshotPreCommit() {
+        //add 4 more entries
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
 
+        //sending negative values should not cause any changes
+        replicatedLogImpl.snapshotPreCommit(-1, -1);
+        assertEquals(8, replicatedLogImpl.size());
+        assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+
         replicatedLogImpl.snapshotPreCommit(4, 3);
         assertEquals(3, replicatedLogImpl.size());
         assertEquals(4, replicatedLogImpl.getSnapshotIndex());
@@ -152,7 +159,31 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
 
+    }
+
+    @Test
+    public void testIsPresent() {
+        assertTrue(replicatedLogImpl.isPresent(0));
+        assertTrue(replicatedLogImpl.isPresent(1));
+        assertTrue(replicatedLogImpl.isPresent(2));
+        assertTrue(replicatedLogImpl.isPresent(3));
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D")));
+        replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3
+        replicatedLogImpl.snapshotCommit();
+
+        assertFalse(replicatedLogImpl.isPresent(0));
+        assertFalse(replicatedLogImpl.isPresent(1));
+        assertFalse(replicatedLogImpl.isPresent(2));
+        assertFalse(replicatedLogImpl.isPresent(3));
+        assertTrue(replicatedLogImpl.isPresent(4));
+
+        replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4
+        replicatedLogImpl.snapshotCommit();
+        assertFalse(replicatedLogImpl.isPresent(4));
 
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D")));
+        assertTrue(replicatedLogImpl.isPresent(5));
     }
 
     // create a snapshot for test
index 59046fd779fb1319ed4d8f48bb8f6911b9eae301..1cd8550be75677810a07bd79db13ebf272a4d250 100644 (file)
@@ -691,7 +691,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
 
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
 
@@ -737,7 +737,8 @@ public class RaftActorTest extends AbstractActorTest {
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+                long replicatedToAllIndex = 1;
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
@@ -749,13 +750,16 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(dataPersistenceProvider).deleteMessages(100);
 
-                assertEquals(2, mockRaftActor.getReplicatedLog().size());
+                assertEquals(3, mockRaftActor.getReplicatedLog().size());
+                assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
 
+                assertNotNull(mockRaftActor.getReplicatedLog().get(2));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
 
                 // Index 2 will not be in the log because it was removed due to snapshotting
-                assertNull(mockRaftActor.getReplicatedLog().get(2));
+                assertNull(mockRaftActor.getReplicatedLog().get(1));
+                assertNull(mockRaftActor.getReplicatedLog().get(0));
 
             }
         };
@@ -870,7 +874,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -962,7 +966,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
-                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1));
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+
                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(leaderActor.delegate).createSnapshot();
 
@@ -1040,19 +1045,21 @@ public class RaftActorTest extends AbstractActorTest {
 
                 followerActor.waitForInitializeBehaviorComplete();
 
-                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
                 Follower follower = new Follower(followerActor.getRaftActorContext());
                 followerActor.setCurrentBehavior(follower);
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
 
+                // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
 
-                // log as indices 0-5
+                // log has indices 0-5
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
                 //snapshot on 4
-                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1));
+                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+
                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(followerActor.delegate).createSnapshot();
 
@@ -1090,7 +1097,7 @@ public class RaftActorTest extends AbstractActorTest {
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                // capture snapshot reply should remove the snapshotted entries only
+                // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
 
@@ -1150,16 +1157,22 @@ public class RaftActorTest extends AbstractActorTest {
                 // create 5 entries in the log
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
                 //set the snapshot index to 4 , 0 to 4 are snapshotted
                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+                //setting replicatedToAllIndex = 9, for the log to clear
+                leader.setReplicatedToAllIndex(9);
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // simulate a real snapshot
                 leaderActor.onReceiveCommand(new SendHeartBeat());
@@ -1182,7 +1195,7 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+                assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
index 42a7911be31f3411c3467435bec446dfef15ecb7..c133c0615f0c770feea88daa1b8b82fa5cf8f661 100644 (file)
@@ -1,8 +1,12 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
     private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
@@ -302,38 +300,52 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testFakeSnapshots() {
+    public void testPerformSnapshot() {
         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
-        AbstractRaftActorBehavior behavior = new Leader(context);
-        context.getTermInformation().update(1, "leader");
+        AbstractRaftActorBehavior abstractBehavior =  (AbstractRaftActorBehavior) createBehavior(context);
+        if (abstractBehavior instanceof Candidate) {
+            return;
+        }
 
-        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        context.getTermInformation().update(1, "test");
+
+        //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(2, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(1);
-        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
         context.setLastApplied(2);
-        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        abstractBehavior.performSnapshotWithoutCapture(3);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(3, context.getReplicatedLog().size());
 
-
+        // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        context.setLastApplied(2);
+        abstractBehavior.performSnapshotWithoutCapture(1);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+        assertEquals(1, context.getReplicatedLog().size());
     }
 
+
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
         ActorRef actorRef, RaftRPC rpc) {
 
index 94b9698abf3a06f41527a83e5cec2461b2658266..4b0651a48eb07f4814bd990386bbe94d32159dfd 100644 (file)
@@ -1467,7 +1467,7 @@ public class ShardTest extends AbstractActorTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
             shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));