Merge "Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in...
authorMoiz Raja <moraja@cisco.com>
Wed, 18 Feb 2015 22:38:50 +0000 (22:38 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 18 Feb 2015 22:38:51 +0000 (22:38 +0000)
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index d1c3fefee8309208a6df6fe9b539a10ee000ddef..e2aa16918e8ed0354f8543f8edd74b89474e4072 100644 (file)
@@ -41,7 +41,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     protected int adjustedIndex(long logEntryIndex) {
-        if(snapshotIndex < 0){
+        if (snapshotIndex < 0) {
             return (int) logEntryIndex;
         }
         return (int) (logEntryIndex - (snapshotIndex + 1));
@@ -134,6 +134,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
        return journal.size();
     }
 
+    @Override
+    public int dataSize() {
+        return dataSize;
+    }
+
     @Override
     public boolean isPresent(long logEntryIndex) {
         if (logEntryIndex > lastIndex()) {
@@ -200,6 +205,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
         dataSize = 0;
+        // need to recalc the datasize based on the entries left after precommit.
+        for(ReplicatedLogEntry logEntry : journal) {
+            dataSize += logEntry.size();
+        }
+
     }
 
     @Override
index 1f446c72f1ee9eccef904866236c1ec5ea235fb9..353d0b4a241f844f1338435e712202211263e3d7 100644 (file)
@@ -573,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
@@ -668,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-        //be greedy and remove entries from in-mem journal which are in the snapshot
-        // and update snapshotIndex and snapshotTerm without waiting for the success,
+        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+        if (context.getReplicatedLog().dataSize() > dataThreshold) {
+            // if memory is less, clear the log based on lastApplied.
+            // this could/should only happen if one of the followers is down
+            // as normally we keep removing from the log when its replicated to all.
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
 
-        context.getReplicatedLog().snapshotPreCommit(
-            captureSnapshot.getLastAppliedIndex(),
-            captureSnapshot.getLastAppliedTerm());
+        } else {
+            // clear the log based on replicatedToAllIndex
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                    captureSnapshot.getReplicatedToAllTerm());
+        }
+        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
 
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
@@ -717,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
 
-                @Override public void apply(DeleteEntries param)
-                    throws Exception {
+                @Override
+                public void apply(DeleteEntries param)
+                        throws Exception {
                     //FIXME : Doing nothing for now
                     dataSize = 0;
-                    for(ReplicatedLogEntry entry : journal){
+                    for (ReplicatedLogEntry entry : journal) {
                         dataSize += entry.size();
                     }
                 }
@@ -735,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(replicatedLogEntry, null);
         }
 
-        @Override
-        public int dataSize() {
-            return dataSize;
-        }
-
         public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -766,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex()+1;
+                        long journalSize = lastIndex() + 1;
 
                         if(!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
@@ -817,12 +822,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
-                            getSelf().tell(new CaptureSnapshot(
-                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
                                 null);
                             context.setSnapshotCaptureInitiated(true);
                         }
-                        if(callback != null){
+                        if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
                     }
index 80b7ad90d05dbb1e9fe45ec57a65a44dedfe4463..82d0839bee772bd8efba88a8d1392ab1d336ff1c 100644 (file)
@@ -145,14 +145,14 @@ public interface ReplicatedLog {
      * sets snapshot term
      * @param snapshotTerm
      */
-    public void setSnapshotTerm(long snapshotTerm);
+    void setSnapshotTerm(long snapshotTerm);
 
     /**
      * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
      * @param startIndex
      * @param endIndex
      */
-    public void clear(int startIndex, int endIndex);
+    void clear(int startIndex, int endIndex);
 
     /**
      * Handles all the bookkeeping in order to perform a rollback in the
@@ -160,20 +160,21 @@ public interface ReplicatedLog {
      * @param snapshotCapturedIndex
      * @param snapshotCapturedTerm
      */
-    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
+    void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
 
     /**
      * Sets the Replicated log to state after snapshot success.
      */
-    public void snapshotCommit();
+    void snapshotCommit();
 
     /**
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
-    public void snapshotRollback();
+    void snapshotRollback();
 
     /**
      * Size of the data in the log (in bytes)
      */
-    public int dataSize();
+    int dataSize();
+
 }
index d4dd3350f30b120bf965c885e3319152db9a2c38..a96b1e435cf88df201be0c344908143b29166334 100644 (file)
@@ -14,19 +14,23 @@ public class CaptureSnapshot {
     private long lastIndex;
     private long lastTerm;
     private boolean installSnapshotInitiated;
+    private long replicatedToAllIndex;
+    private long replicatedToAllTerm;
 
     public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
     }
 
     public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, boolean installSnapshotInitiated) {
+        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
         this.installSnapshotInitiated = installSnapshotInitiated;
+        this.replicatedToAllIndex = replicatedToAllIndex;
+        this.replicatedToAllTerm = replicatedToAllTerm;
     }
 
     public long getLastAppliedIndex() {
@@ -48,4 +52,12 @@ public class CaptureSnapshot {
     public boolean isInstallSnapshotInitiated() {
         return installSnapshotInitiated;
     }
+
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
+    public long getReplicatedToAllTerm() {
+        return replicatedToAllTerm;
+    }
 }
index 9e4f3b46c45b5de54458770b910b63f6c70305a8..94c38f6108eabf62ef9e41a3f484a6e2f915129d 100644 (file)
@@ -91,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
-    private long replicatedToAllIndex = -1;
-
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -236,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void purgeInMemoryLog() {
-        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        //find the lowest index across followers which has been replicated to all.
+        // lastApplied if there are no followers, so that we keep clearing the log for single-node
         // we would delete the in-mem log from that index on, in-order to minimize mem usage
         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
-        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
         for (FollowerLogInformation info : followerToLog.values()) {
             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
         }
 
-        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+        super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
     @Override
@@ -472,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex >= followerNextIndex) {
+                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -507,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), replicatedToAllIndex);
+            context.getCommitIndex(), super.getReplicatedToAllIndex());
 
         if(!entries.isEmpty()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
@@ -518,7 +517,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
-     * /**
      * Install Snapshot works as follows
      * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
      * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
@@ -533,10 +531,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * @param followerNextIndex
      */
     private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
-        }
-
         if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
                 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
 
@@ -557,15 +551,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if (lastAppliedEntry != null) {
                     lastAppliedIndex = lastAppliedEntry.getIndex();
                     lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
                     lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
                     lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
                 }
 
                 boolean isInstallSnapshotInitiated = true;
-                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-                        actor());
+                long replicatedToAllIndex = super.getReplicatedToAllIndex();
+                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+                    isInstallSnapshotInitiated), actor());
                 context.setSnapshotCaptureInitiated(true);
             }
         }
@@ -605,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         nextSnapshotChunk,
-                            followerToSnapshot.incrementChunkIndex(),
-                            followerToSnapshot.getTotalChunks(),
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
                         Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
index 075b2873e45332364c09aee83c49e6b23e40780c..8f433d529a8c12e81079a8463dd968e377dc330d 100644 (file)
@@ -58,12 +58,23 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected String leaderId = null;
 
+    private long replicatedToAllIndex = -1;
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
         this.LOG = context.getLogger();
     }
 
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        this.replicatedToAllIndex = replicatedToAllIndex;
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
     /**
      * Derived classes should not directly handle AppendEntries messages it
      * should let the base class handle it first. Once the base class handles
@@ -423,17 +434,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     }
 
-    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
 
+    /**
+     * Performs a snapshot with no capture on the replicated log.
+     * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+     *
+     * @param snapshotCapturedIndex
+     */
+    protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
         //  we would want to keep the lastApplied as its used while capturing snapshots
-        long tempMin = Math.min(minReplicatedToAllIndex,
-                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+        long lastApplied = context.getLastApplied();
+        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
         if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
-            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            //use the term of the temp-min, since we check for isPresent, entry will not be null
+            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
             context.getReplicatedLog().snapshotCommit();
-            return tempMin;
+            setReplicatedToAllIndex(tempMin);
         }
-        return currentReplicatedIndex;
     }
+
 }
index 8a0788702d81f3dfe04d713bcc99e527273c82d2..675543f2d1449514088b4b4868bd2b77a51a36bc 100644 (file)
@@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior {
             lastIndex(), lastTerm()), actor());
 
         if (!context.isSnapshotCaptureInitiated()) {
-            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+            super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
         return this;
index 064cd8b88c8b68152f04cdc25378c792eb263a7b..b766e0ce39fe9be1b847f54c625c802d036a7a0a 100644 (file)
@@ -50,4 +50,16 @@ public interface RaftActorBehavior extends AutoCloseable{
      * @return
      */
     String getLeaderId();
+
+    /**
+     * setting the index of the log entry which is replicated to all nodes
+     * @param replicatedToAllIndex
+     */
+    void setReplicatedToAllIndex(long replicatedToAllIndex);
+
+    /**
+     * getting the index of the log entry which is replicated to all nodes
+     * @return
+     */
+    long getReplicatedToAllIndex();
 }
index ffd8edfbe15fa3aad7a9f237a053b23017946337..885c3ab1094eb68d677fad1728fcbf6046e8f127 100644 (file)
@@ -20,6 +20,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+
 /**
 *
 */
@@ -130,11 +131,17 @@ public class AbstractReplicatedLogImplTest {
 
     @Test
     public void testSnapshotPreCommit() {
+        //add 4 more entries
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
 
+        //sending negative values should not cause any changes
+        replicatedLogImpl.snapshotPreCommit(-1, -1);
+        assertEquals(8, replicatedLogImpl.size());
+        assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+
         replicatedLogImpl.snapshotPreCommit(4, 3);
         assertEquals(3, replicatedLogImpl.size());
         assertEquals(4, replicatedLogImpl.getSnapshotIndex());
@@ -152,7 +159,31 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
 
+    }
+
+    @Test
+    public void testIsPresent() {
+        assertTrue(replicatedLogImpl.isPresent(0));
+        assertTrue(replicatedLogImpl.isPresent(1));
+        assertTrue(replicatedLogImpl.isPresent(2));
+        assertTrue(replicatedLogImpl.isPresent(3));
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D")));
+        replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3
+        replicatedLogImpl.snapshotCommit();
+
+        assertFalse(replicatedLogImpl.isPresent(0));
+        assertFalse(replicatedLogImpl.isPresent(1));
+        assertFalse(replicatedLogImpl.isPresent(2));
+        assertFalse(replicatedLogImpl.isPresent(3));
+        assertTrue(replicatedLogImpl.isPresent(4));
+
+        replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4
+        replicatedLogImpl.snapshotCommit();
+        assertFalse(replicatedLogImpl.isPresent(4));
 
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D")));
+        assertTrue(replicatedLogImpl.isPresent(5));
     }
 
     // create a snapshot for test
index 59046fd779fb1319ed4d8f48bb8f6911b9eae301..1cd8550be75677810a07bd79db13ebf272a4d250 100644 (file)
@@ -691,7 +691,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
 
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
 
@@ -737,7 +737,8 @@ public class RaftActorTest extends AbstractActorTest {
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+                long replicatedToAllIndex = 1;
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
@@ -749,13 +750,16 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(dataPersistenceProvider).deleteMessages(100);
 
-                assertEquals(2, mockRaftActor.getReplicatedLog().size());
+                assertEquals(3, mockRaftActor.getReplicatedLog().size());
+                assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
 
+                assertNotNull(mockRaftActor.getReplicatedLog().get(2));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
 
                 // Index 2 will not be in the log because it was removed due to snapshotting
-                assertNull(mockRaftActor.getReplicatedLog().get(2));
+                assertNull(mockRaftActor.getReplicatedLog().get(1));
+                assertNull(mockRaftActor.getReplicatedLog().get(0));
 
             }
         };
@@ -870,7 +874,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -962,7 +966,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
-                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1));
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+
                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(leaderActor.delegate).createSnapshot();
 
@@ -1040,19 +1045,21 @@ public class RaftActorTest extends AbstractActorTest {
 
                 followerActor.waitForInitializeBehaviorComplete();
 
-                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
                 Follower follower = new Follower(followerActor.getRaftActorContext());
                 followerActor.setCurrentBehavior(follower);
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
 
+                // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
 
-                // log as indices 0-5
+                // log has indices 0-5
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
                 //snapshot on 4
-                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1));
+                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+
                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(followerActor.delegate).createSnapshot();
 
@@ -1090,7 +1097,7 @@ public class RaftActorTest extends AbstractActorTest {
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                // capture snapshot reply should remove the snapshotted entries only
+                // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
 
@@ -1150,16 +1157,22 @@ public class RaftActorTest extends AbstractActorTest {
                 // create 5 entries in the log
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
                 //set the snapshot index to 4 , 0 to 4 are snapshotted
                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+                //setting replicatedToAllIndex = 9, for the log to clear
+                leader.setReplicatedToAllIndex(9);
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // simulate a real snapshot
                 leaderActor.onReceiveCommand(new SendHeartBeat());
@@ -1182,7 +1195,7 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+                assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
index 42a7911be31f3411c3467435bec446dfef15ecb7..c133c0615f0c770feea88daa1b8b82fa5cf8f661 100644 (file)
@@ -1,8 +1,12 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
     private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
@@ -302,38 +300,52 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     @Test
-    public void testFakeSnapshots() {
+    public void testPerformSnapshot() {
         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
-        AbstractRaftActorBehavior behavior = new Leader(context);
-        context.getTermInformation().update(1, "leader");
+        AbstractRaftActorBehavior abstractBehavior =  (AbstractRaftActorBehavior) createBehavior(context);
+        if (abstractBehavior instanceof Candidate) {
+            return;
+        }
 
-        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        context.getTermInformation().update(1, "test");
+
+        //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(2, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(1);
-        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
         context.setLastApplied(2);
-        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        abstractBehavior.performSnapshotWithoutCapture(3);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(3, context.getReplicatedLog().size());
 
-
+        // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        context.setLastApplied(2);
+        abstractBehavior.performSnapshotWithoutCapture(1);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+        assertEquals(1, context.getReplicatedLog().size());
     }
 
+
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
         ActorRef actorRef, RaftRPC rpc) {
 
index 94b9698abf3a06f41527a83e5cec2461b2658266..4b0651a48eb07f4814bd990386bbe94d32159dfd 100644 (file)
@@ -1467,7 +1467,7 @@ public class ShardTest extends AbstractActorTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
+            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
             shard.tell(capture, getRef());
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));