Bug-2590: Clustering : Minimize usage of in-memory journal 15/14615/10
authorKamal Rameshan <kramesha@cisco.com>
Thu, 29 Jan 2015 19:44:30 +0000 (11:44 -0800)
committerKamal Rameshan <kramesha@cisco.com>
Fri, 6 Feb 2015 19:27:07 +0000 (19:27 +0000)
In order to minimize the memory usage of the in-memory journal, we can remove the entries from the Leader's journal
once it has been successfully replicated to ALL its followers.
This does not intefere with snapshots, as we capture snapshots on demand.

The followers follow the leader in cleaning the in-memory journal, there by ensuring that all the journals have more or less same entries.
This is done by the leader passing its replicatedToAllIndex as part of the AppendEntries.

Change-Id: I579a1f90d3c4e5d6be4ce699072688788b07bd48
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index aa7b4533b7758f7a3f457ea48b1ae72cc6d94162..766b80e73dd12c890df3ed493e397a7cd144aab4 100644 (file)
@@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
 
-
-
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -436,7 +432,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!hasSnapshotCaptureInitiated){
+                            if(!context.isSnapshotCaptureInitiated()){
                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
@@ -693,7 +689,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
     }
 
     protected boolean hasFollowers(){
@@ -794,7 +790,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
+                        if (!context.isSnapshotCaptureInitiated() &&
                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
                                         dataSizeForCheck > dataThreshold)) {
 
@@ -827,7 +823,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             getSelf().tell(new CaptureSnapshot(
                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
                         if(callback != null){
                             callback.apply(replicatedLogEntry);
index 0eb4b7377976526ac48eaefa9a9922b6a70c0076..0e1f20b24681ed6a0cd0644b513251114f225745 100644 (file)
@@ -89,7 +89,7 @@ public interface RaftActorContext {
      *
      * @param replicatedLog
      */
-    public void setReplicatedLog(ReplicatedLog replicatedLog);
+    void setReplicatedLog(ReplicatedLog replicatedLog);
 
     /**
      * @return A representation of the log
@@ -137,7 +137,7 @@ public interface RaftActorContext {
      *
      * @param name
      */
-    public void removePeer(String name);
+    void removePeer(String name);
 
     /**
      * Given a peerId return the corresponding actor
@@ -165,5 +165,10 @@ public interface RaftActorContext {
     /**
      * @return ConfigParams
      */
-    public ConfigParams getConfigParams();
+    ConfigParams getConfigParams();
+
+    void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+    boolean isSnapshotCaptureInitiated();
+
 }
index e4aef0a8445d2400e54fff5c1d09c9044747134f..5438fe7c4840ed4c91f17c98ebc1022ace720d7c 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 import akka.event.LoggingAdapter;
-
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -41,6 +40,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ConfigParams configParams;
 
+    private boolean snapshotCaptureInitiated;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
@@ -130,6 +131,16 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     @Override public void addToPeers(String name, String address) {
         peerAddresses.put(name, address);
     }
index da1627b98e7e4e8204385914795300ce073a71ad..e28e4b066d372ee54594ecaf9fe0e5259ad4cdfd 100644 (file)
@@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
+    private long replicatedToAllIndex = -1;
+
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            purgeInMemoryLog();
+        }
+
         return this;
     }
 
+    private void purgeInMemoryLog() {
+        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        // we would delete the in-mem log from that index on, in-order to minimize mem usage
+        // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        for (FollowerLogInformation info : followerToLog.values()) {
+            minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+        }
+
+        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+    }
+
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
         final Iterator<ClientRequestTracker> it = trackerList.iterator();
@@ -460,7 +478,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             new AppendEntries(currentTerm(), context.getId(),
                 prevLogIndex(followerNextIndex),
                 prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
+                context.getCommitIndex(),
+                replicatedToAllIndex).toSerializable(),
             actor()
         );
     }
index dbeafe9eb8b2fce467451eb4594c6c1be913797e..99824b0bb4e6235a56d12cb719384fee85fd309a 100644 (file)
@@ -422,4 +422,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return numMajority;
 
     }
+
+    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+        //  we would want to keep the lastApplied as its used while capturing snapshots
+        long tempMin = Math.min(minReplicatedToAllIndex,
+                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            context.getReplicatedLog().snapshotCommit();
+            return tempMin;
+        }
+        return currentReplicatedIndex;
+    }
 }
index 31b5efbe3878df73f74d46d90d018751dca67bbf..410b3c266c87066cef3828732f62ca154a580df3 100644 (file)
@@ -254,6 +254,10 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+        }
+
         return this;
     }
 
index 81981062177510641c08edb243016075f388ae0f..97bcd6a708b7c8f11646b026e7e1653a2713ff40 100644 (file)
@@ -50,14 +50,18 @@ public class AppendEntries extends AbstractRaftRPC {
     // leader's commitIndex
     private final long leaderCommit;
 
+    // index which has been replicated successfully to all followers, -1 if none
+    private final long replicatedToAllIndex;
+
     public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
         this.entries = entries;
         this.leaderCommit = leaderCommit;
+        this.replicatedToAllIndex = replicatedToAllIndex;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -102,6 +106,10 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb =
@@ -112,6 +120,7 @@ public class AppendEntries extends AbstractRaftRPC {
         sb.append(", prevLogTerm=").append(prevLogTerm);
         sb.append(", entries=").append(entries);
         sb.append(", leaderCommit=").append(leaderCommit);
+        sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
         sb.append('}');
         return sb.toString();
     }
@@ -203,7 +212,7 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit());
+            from.getLeaderCommit(), -1);
 
         return to;
     }
index d53ccf25002dbbf407d2e4dc5dc35c0ec231c590..ffd8edfbe15fa3aad7a9f237a053b23017946337 100644 (file)
@@ -128,6 +128,33 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testSnapshotPreCommit() {
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+        replicatedLogImpl.snapshotPreCommit(4, 3);
+        assertEquals(3, replicatedLogImpl.size());
+        assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(6, 3);
+        assertEquals(1, replicatedLogImpl.size());
+        assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+        //running it again on an empty list should not throw exception
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
index cd852eaae2d247f8b484f435b54e73fd0b97f1c9..9d3e5dcb12da55ee474f27e055487ff690321def 100644 (file)
@@ -34,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap<>();
     private ConfigParams configParams;
+    private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -185,6 +186,16 @@ public class MockRaftActorContext implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     public void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
index 6b266d710e4aa44f793c4ed2bc809347944f1c15..30893810f5a9bee6542fff36f81694380844a06c 100644 (file)
@@ -1,17 +1,5 @@
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
@@ -41,6 +29,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -61,6 +50,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
@@ -70,6 +61,20 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class RaftActorTest extends AbstractActorTest {
 
 
@@ -86,6 +91,7 @@ public class RaftActorTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
+        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -114,7 +120,8 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                             DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
             this.delegate = mock(RaftActor.class);
@@ -133,6 +140,14 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+        public void waitForInitializeBehaviorComplete() {
+            try {
+                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
         public List<Object> getState() {
             return state;
         }
@@ -176,6 +191,12 @@ public class RaftActorTest extends AbstractActorTest {
             recoveryComplete.countDown();
         }
 
+        @Override
+        protected void initializeBehavior() {
+            super.initializeBehavior();
+            initializeBehaviorComplete.countDown();
+        }
+
         @Override
         protected void applyRecoverySnapshot(byte[] bytes) {
             delegate.applyRecoverySnapshot(bytes);
@@ -339,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 // 4 messages as part of snapshot, which are applied to state
             ByteString snapshotBytes  = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+                    new MockRaftActorContext.MockPayload("A"),
+                    new MockRaftActorContext.MockPayload("B"),
+                    new MockRaftActorContext.MockPayload("C"),
+                    new MockRaftActorContext.MockPayload("D")));
 
             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
@@ -909,6 +930,195 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "leader1";
+
+                ActorRef followerActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1", followerActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor leaderActor = mockActorRef.underlyingActor();
+                leaderActor.getRaftActorContext().setCommitIndex(4);
+                leaderActor.getRaftActorContext().setLastApplied(4);
+                leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                leaderActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+                Leader leader = new Leader(leaderActor.getRaftActorContext());
+                leaderActor.setCurrentBehavior(leader);
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(leaderActor.delegate).createSnapshot();
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                //fake snapshot on index 5
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, leaderActor.getReplicatedLog().size());
+                assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+                // add another non-replicated entry
+                leaderActor.getReplicatedLog().append(
+                        new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+                //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+                assertEquals(2, leaderActor.getReplicatedLog().size());
+                assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "follower1";
+
+                ActorRef leaderActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("leader", leaderActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor followerActor = mockActorRef.underlyingActor();
+                followerActor.getRaftActorContext().setCommitIndex(4);
+                followerActor.getRaftActorContext().setLastApplied(4);
+                followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                followerActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+                Follower follower = new Follower(followerActor.getRaftActorContext());
+                followerActor.setCurrentBehavior(follower);
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+                // log as indices 0-5
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //snapshot on 4
+                followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(followerActor.delegate).createSnapshot();
+
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                List<ReplicatedLogEntry> entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                                        new MockRaftActorContext.MockPayload("foo-6"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+                assertEquals(7, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 7
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+                assertEquals(8, followerActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+                assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                // send an additional entry 8 with leaderCommit = 7
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+                // 7 and 8, as lastapplied is 7
+                assertEquals(2, followerActor.getReplicatedLog().size());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 38930180082fdcb80958c1b9adbe04f5062769bf..42a7911be31f3411c3467435bec446dfef15ecb7 100644 (file)
@@ -74,7 +74,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             context.getTermInformation().update(1000, "test");
 
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -131,7 +131,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
 
                 AppendEntries appendEntries =
-                    new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+                    new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
 
                 RaftActorBehavior behavior = createBehavior(context);
 
@@ -301,6 +301,39 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshots() {
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+        AbstractRaftActorBehavior behavior = new Leader(context);
+        context.getTermInformation().update(1, "leader");
+
+        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(2, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(1);
+        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setLastApplied(2);
+        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        assertEquals(3, context.getReplicatedLog().size());
+
+
+    }
+
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
         ActorRef actorRef, RaftRPC rpc) {
 
@@ -347,7 +380,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
-        return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
index 485ee4b316d2506b35d31cfed5f6aa252f16bff3..0dc68c2461c2235b22e663b39ad51220e96c80b5 100644 (file)
@@ -3,6 +3,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -16,9 +19,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
@@ -167,7 +168,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
-                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
                         // do not put code outside this method, will run afterwards
index a04d6aeb556cd2f84ffb10ac23302c9e5928451b..719a8256a0757f406777a8d03b52fab879009c74 100644 (file)
@@ -181,7 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
 
             RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
@@ -217,7 +217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // AppendEntries is now sent with a bigger term
             // this will set the receivers term to be the same as the sender's term
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -293,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+                new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -373,7 +373,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+                new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -446,7 +446,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -502,7 +502,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
 
             RaftActorBehavior behavior = createBehavior(context);
 
index abde51bde592951b404835b7d3529b5c8150257a..5f5d73dbe6b126028fdce788d29f3a8cf7cc75c3 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -21,6 +19,9 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 /**
  * Unit tests for AppendEntries.
  *
@@ -34,7 +35,7 @@ public class AppendEntriesTest {
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
-        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
@@ -44,7 +45,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
-                Collections.<ReplicatedLogEntry>emptyList(), 10L);
+                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
@@ -54,7 +55,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
-        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
@@ -71,6 +72,7 @@ public class AppendEntriesTest {
         assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+        assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
index cf37cbdd005effaacb2294edd9308b8598e9788f..21a0cb6a889a78cf31910198eadc79f6c53f10e3 100644 (file)
@@ -30,11 +30,11 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received message {}", messageType);
+//            LOG.debug("Received message {}", messageType);
         }
         handleReceive(message);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Done handling message {}", messageType);
+//            LOG.debug("Done handling message {}", messageType);
         }
     }
 
index 5b7002eda2aafe923c0ad5d3b20addfd095b8a3e..ce7d6303ad13985fa4c08b4fd152cdae52cbd360 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.SerializationUtils;
@@ -24,6 +22,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
 @Deprecated
 public class CompositeModificationByteStringPayloadTest {
 
@@ -69,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest {
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
     }
 }
index a55f6b865d127b9f6f4abe33cd9cbb227ab2839f..90b978821f2a10860e6006a4047a97ed7b667fa3 100644 (file)
@@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest {
         });
 
         AppendEntries appendEntries =
-            new AppendEntries(1, "member-1", 0, 100, entries, 1);
+            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
index 79c1bb4720e9790599b99bb17c6cf6d823275b86..28fc6b0f57ed950479ba2737a4a2aef26e450475 100644 (file)
@@ -98,7 +98,7 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 
     public static AppendEntries keyValueAppendEntries() {
@@ -123,6 +123,6 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 }