Merge "Bug-2590: Clustering : Minimize usage of in-memory journal"
authorMoiz Raja <moraja@cisco.com>
Fri, 6 Feb 2015 23:06:10 +0000 (23:06 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 6 Feb 2015 23:06:11 +0000 (23:06 +0000)
18 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index aa7b453..766b80e 100644 (file)
@@ -107,14 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private CaptureSnapshot captureSnapshot = null;
 
-    private volatile boolean hasSnapshotCaptureInitiated = false;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
 
-
-
     public RaftActor(String id, Map<String, String> peerAddresses) {
         this(id, peerAddresses, Optional.<ConfigParams>absent());
     }
@@ -436,7 +432,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
 
                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!hasSnapshotCaptureInitiated){
+                            if(!context.isSnapshotCaptureInitiated()){
                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
                                         raftContext.getTermInformation().getCurrentTerm());
                                 raftContext.getReplicatedLog().snapshotCommit();
@@ -693,7 +689,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         captureSnapshot = null;
-        hasSnapshotCaptureInitiated = false;
+        context.setSnapshotCaptureInitiated(false);
     }
 
     protected boolean hasFollowers(){
@@ -794,7 +790,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
                         // when a snaphsot is being taken, captureSnapshot != null
-                        if (hasSnapshotCaptureInitiated == false &&
+                        if (!context.isSnapshotCaptureInitiated() &&
                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
                                         dataSizeForCheck > dataThreshold)) {
 
@@ -827,7 +823,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             getSelf().tell(new CaptureSnapshot(
                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
                                 null);
-                            hasSnapshotCaptureInitiated = true;
+                            context.setSnapshotCaptureInitiated(true);
                         }
                         if(callback != null){
                             callback.apply(replicatedLogEntry);
index 0eb4b73..0e1f20b 100644 (file)
@@ -89,7 +89,7 @@ public interface RaftActorContext {
      *
      * @param replicatedLog
      */
-    public void setReplicatedLog(ReplicatedLog replicatedLog);
+    void setReplicatedLog(ReplicatedLog replicatedLog);
 
     /**
      * @return A representation of the log
@@ -137,7 +137,7 @@ public interface RaftActorContext {
      *
      * @param name
      */
-    public void removePeer(String name);
+    void removePeer(String name);
 
     /**
      * Given a peerId return the corresponding actor
@@ -165,5 +165,10 @@ public interface RaftActorContext {
     /**
      * @return ConfigParams
      */
-    public ConfigParams getConfigParams();
+    ConfigParams getConfigParams();
+
+    void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
+
+    boolean isSnapshotCaptureInitiated();
+
 }
index e4aef0a..5438fe7 100644 (file)
@@ -14,7 +14,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 import akka.event.LoggingAdapter;
-
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -41,6 +40,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final ConfigParams configParams;
 
+    private boolean snapshotCaptureInitiated;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
@@ -130,6 +131,16 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     @Override public void addToPeers(String name, String address) {
         peerAddresses.put(name, address);
     }
index da1627b..e28e4b0 100644 (file)
@@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
+    private long replicatedToAllIndex = -1;
+
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            purgeInMemoryLog();
+        }
+
         return this;
     }
 
+    private void purgeInMemoryLog() {
+        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        // we would delete the in-mem log from that index on, in-order to minimize mem usage
+        // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        for (FollowerLogInformation info : followerToLog.values()) {
+            minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+        }
+
+        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+    }
+
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
         final Iterator<ClientRequestTracker> it = trackerList.iterator();
@@ -460,7 +478,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             new AppendEntries(currentTerm(), context.getId(),
                 prevLogIndex(followerNextIndex),
                 prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
+                context.getCommitIndex(),
+                replicatedToAllIndex).toSerializable(),
             actor()
         );
     }
index dbeafe9..99824b0 100644 (file)
@@ -422,4 +422,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return numMajority;
 
     }
+
+    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+
+        //  we would want to keep the lastApplied as its used while capturing snapshots
+        long tempMin = Math.min(minReplicatedToAllIndex,
+                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+
+        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            context.getReplicatedLog().snapshotCommit();
+            return tempMin;
+        }
+        return currentReplicatedIndex;
+    }
 }
index 31b5efb..410b3c2 100644 (file)
@@ -254,6 +254,10 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+        }
+
         return this;
     }
 
index 8198106..97bcd6a 100644 (file)
@@ -50,14 +50,18 @@ public class AppendEntries extends AbstractRaftRPC {
     // leader's commitIndex
     private final long leaderCommit;
 
+    // index which has been replicated successfully to all followers, -1 if none
+    private final long replicatedToAllIndex;
+
     public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
         this.entries = entries;
         this.leaderCommit = leaderCommit;
+        this.replicatedToAllIndex = replicatedToAllIndex;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -102,6 +106,10 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb =
@@ -112,6 +120,7 @@ public class AppendEntries extends AbstractRaftRPC {
         sb.append(", prevLogTerm=").append(prevLogTerm);
         sb.append(", entries=").append(entries);
         sb.append(", leaderCommit=").append(leaderCommit);
+        sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
         sb.append('}');
         return sb.toString();
     }
@@ -203,7 +212,7 @@ public class AppendEntries extends AbstractRaftRPC {
             from.getPrevLogIndex(),
             from.getPrevLogTerm(),
             logEntryList,
-            from.getLeaderCommit());
+            from.getLeaderCommit(), -1);
 
         return to;
     }
index d53ccf2..ffd8edf 100644 (file)
@@ -128,6 +128,33 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
+    @Test
+    public void testSnapshotPreCommit() {
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
+
+        replicatedLogImpl.snapshotPreCommit(4, 3);
+        assertEquals(3, replicatedLogImpl.size());
+        assertEquals(4, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(6, 3);
+        assertEquals(1, replicatedLogImpl.size());
+        assertEquals(6, replicatedLogImpl.getSnapshotIndex());
+
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+        //running it again on an empty list should not throw exception
+        replicatedLogImpl.snapshotPreCommit(7, 3);
+        assertEquals(0, replicatedLogImpl.size());
+        assertEquals(7, replicatedLogImpl.getSnapshotIndex());
+
+
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
index cd852ea..9d3e5dc 100644 (file)
@@ -34,6 +34,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private ReplicatedLog replicatedLog;
     private Map<String, String> peerAddresses = new HashMap<>();
     private ConfigParams configParams;
+    private boolean snapshotCaptureInitiated;
 
     public MockRaftActorContext(){
         electionTerm = null;
@@ -185,6 +186,16 @@ public class MockRaftActorContext implements RaftActorContext {
         return configParams;
     }
 
+    @Override
+    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
+        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
+    }
+
+    @Override
+    public boolean isSnapshotCaptureInitiated() {
+        return snapshotCaptureInitiated;
+    }
+
     public void setConfigParams(ConfigParams configParams) {
         this.configParams = configParams;
     }
index 6b266d7..3089381 100644 (file)
@@ -1,17 +1,5 @@
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
@@ -41,6 +29,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -61,6 +50,8 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
@@ -70,6 +61,20 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class RaftActorTest extends AbstractActorTest {
 
 
@@ -86,6 +91,7 @@ public class RaftActorTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
+        private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private static final long serialVersionUID = 1L;
@@ -114,7 +120,8 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
+        public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                             DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
             this.delegate = mock(RaftActor.class);
@@ -133,6 +140,14 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
+        public void waitForInitializeBehaviorComplete() {
+            try {
+                assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
         public List<Object> getState() {
             return state;
         }
@@ -176,6 +191,12 @@ public class RaftActorTest extends AbstractActorTest {
             recoveryComplete.countDown();
         }
 
+        @Override
+        protected void initializeBehavior() {
+            super.initializeBehavior();
+            initializeBehaviorComplete.countDown();
+        }
+
         @Override
         protected void applyRecoverySnapshot(byte[] bytes) {
             delegate.applyRecoverySnapshot(bytes);
@@ -339,10 +360,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 // 4 messages as part of snapshot, which are applied to state
             ByteString snapshotBytes  = fromObject(Arrays.asList(
-                        new MockRaftActorContext.MockPayload("A"),
-                        new MockRaftActorContext.MockPayload("B"),
-                        new MockRaftActorContext.MockPayload("C"),
-                        new MockRaftActorContext.MockPayload("D")));
+                    new MockRaftActorContext.MockPayload("A"),
+                    new MockRaftActorContext.MockPayload("B"),
+                    new MockRaftActorContext.MockPayload("C"),
+                    new MockRaftActorContext.MockPayload("D")));
 
             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
@@ -909,6 +930,195 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "leader1";
+
+                ActorRef followerActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1", followerActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor leaderActor = mockActorRef.underlyingActor();
+                leaderActor.getRaftActorContext().setCommitIndex(4);
+                leaderActor.getRaftActorContext().setLastApplied(4);
+                leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                leaderActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
+                Leader leader = new Leader(leaderActor.getRaftActorContext());
+                leaderActor.setCurrentBehavior(leader);
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
+                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(leaderActor.delegate).createSnapshot();
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                //fake snapshot on index 5
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+                assertEquals(8, leaderActor.getReplicatedLog().size());
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, leaderActor.getReplicatedLog().size());
+                assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
+
+                // add another non-replicated entry
+                leaderActor.getReplicatedLog().append(
+                        new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
+
+                //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
+                leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
+                assertEquals(2, leaderActor.getReplicatedLog().size());
+                assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "follower1";
+
+                ActorRef leaderActor1 =
+                        getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+                config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("leader", leaderActor1.path().toString());
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                        MockRaftActor.props(persistenceId, peerAddresses,
+                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor followerActor = mockActorRef.underlyingActor();
+                followerActor.getRaftActorContext().setCommitIndex(4);
+                followerActor.getRaftActorContext().setLastApplied(4);
+                followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
+
+                followerActor.waitForInitializeBehaviorComplete();
+
+                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+                Follower follower = new Follower(followerActor.getRaftActorContext());
+                followerActor.setCurrentBehavior(follower);
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
+                followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
+
+                // log as indices 0-5
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //snapshot on 4
+                followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
+                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
+                verify(followerActor.delegate).createSnapshot();
+
+                assertEquals(6, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 6
+                List<ReplicatedLogEntry> entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                                        new MockRaftActorContext.MockPayload("foo-6"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
+                assertEquals(7, followerActor.getReplicatedLog().size());
+
+                //fake snapshot on index 7
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
+                assertEquals(8, followerActor.getReplicatedLog().size());
+
+                assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
+
+
+                ByteString snapshotBytes  = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("foo-0"),
+                        new MockRaftActorContext.MockPayload("foo-1"),
+                        new MockRaftActorContext.MockPayload("foo-2"),
+                        new MockRaftActorContext.MockPayload("foo-3"),
+                        new MockRaftActorContext.MockPayload("foo-4")));
+                followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
+                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                // capture snapshot reply should remove the snapshotted entries only
+                assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
+                assertEquals(7, followerActor.getReplicatedLog().lastIndex());
+
+                entries =
+                        Arrays.asList(
+                                (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
+                                        new MockRaftActorContext.MockPayload("foo-7"))
+                        );
+                // send an additional entry 8 with leaderCommit = 7
+                followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
+
+                // 7 and 8, as lastapplied is 7
+                assertEquals(2, followerActor.getReplicatedLog().size());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
     private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
index 3893018..42a7911 100644 (file)
@@ -74,7 +74,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             context.getTermInformation().update(1000, "test");
 
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -131,7 +131,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
 
                 AppendEntries appendEntries =
-                    new AppendEntries(2, "leader-1", -1, 1, entries, 0);
+                    new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
 
                 RaftActorBehavior behavior = createBehavior(context);
 
@@ -301,6 +301,39 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testFakeSnapshots() {
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
+        AbstractRaftActorBehavior behavior = new Leader(context);
+        context.getTermInformation().update(1, "leader");
+
+        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(0);
+        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        assertEquals(2, context.getReplicatedLog().size());
+
+        //2 entries, lastApplied still 0, no purging.
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setLastApplied(1);
+        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        assertEquals(1, context.getReplicatedLog().size());
+
+        //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setLastApplied(2);
+        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        assertEquals(3, context.getReplicatedLog().size());
+
+
+    }
+
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
         ActorRef actorRef, RaftRPC rpc) {
 
@@ -347,7 +380,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     }
 
     protected AppendEntries createAppendEntriesWithNewerTerm() {
-        return new AppendEntries(100, "leader-1", 0, 0, null, 1);
+        return new AppendEntries(100, "leader-1", 0, 0, null, 1, -1);
     }
 
     protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
index 485ee4b..0dc68c2 100644 (file)
@@ -3,6 +3,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -16,9 +19,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
@@ -167,7 +168,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
-                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0));
+                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
                         // do not put code outside this method, will run afterwards
index a04d6ae..719a825 100644 (file)
@@ -181,7 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
             // The new commitIndex is 101
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 100, 1, entries, 101);
+                new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
 
             RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
@@ -217,7 +217,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // AppendEntries is now sent with a bigger term
             // this will set the receivers term to be the same as the sender's term
             AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -293,7 +293,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(1, "leader-1", 2, 1, entries, 4);
+                new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -373,7 +373,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             // This will not work for a Candidate because as soon as a Candidate
             // is created it increments the term
             AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 1, 1, entries, 3);
+                new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -446,7 +446,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
 
             RaftActorBehavior behavior = createBehavior(context);
 
@@ -502,7 +502,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                     new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
 
             AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
 
             RaftActorBehavior behavior = createBehavior(context);
 
index abde51b..5f5d73d 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft.messages;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -21,6 +19,9 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 /**
  * Unit tests for AppendEntries.
  *
@@ -34,7 +35,7 @@ public class AppendEntriesTest {
 
         ReplicatedLogEntry entry2 = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload2"));
 
-        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L);
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, -1);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
@@ -44,7 +45,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromSerializable() {
         AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L,
-                Collections.<ReplicatedLogEntry>emptyList(), 10L);
+                Collections.<ReplicatedLogEntry>emptyList(), 10L, -1);
 
         assertSame("toSerializable", entries, entries.toSerializable());
         assertSame("fromSerializable", entries,
@@ -54,7 +55,7 @@ public class AppendEntriesTest {
     @Test
     public void testToAndFromLegacySerializable() {
         ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload"));
-        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L);
+        AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L, -1);
 
         Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION);
         Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries);
@@ -71,6 +72,7 @@ public class AppendEntriesTest {
         assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
         assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex());
         assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm());
+        assertEquals("getReplicatedToAllIndex", expected.getReplicatedToAllIndex(), actual.getReplicatedToAllIndex());
 
         assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size());
         Iterator<ReplicatedLogEntry> iter = expected.getEntries().iterator();
index cf37cbd..21a0cb6 100644 (file)
@@ -30,11 +30,11 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Received message {}", messageType);
+//            LOG.debug("Received message {}", messageType);
         }
         handleReceive(message);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Done handling message {}", messageType);
+//            LOG.debug("Done handling message {}", messageType);
         }
     }
 
index 5b7002e..ce7d630 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.SerializationUtils;
@@ -24,6 +22,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
 @Deprecated
 public class CompositeModificationByteStringPayloadTest {
 
@@ -69,6 +70,6 @@ public class CompositeModificationByteStringPayloadTest {
 
         entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10, -1).toSerializable());
     }
 }
index a55f6b8..90b9788 100644 (file)
@@ -55,7 +55,7 @@ public class CompositeModificationPayloadTest {
         });
 
         AppendEntries appendEntries =
-            new AppendEntries(1, "member-1", 0, 100, entries, 1);
+            new AppendEntries(1, "member-1", 0, 100, entries, 1, -1);
 
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
index 79c1bb4..28fc6b0 100644 (file)
@@ -98,7 +98,7 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 
     public static AppendEntries keyValueAppendEntries() {
@@ -123,6 +123,6 @@ public class Client {
             }
         });
 
-        return new AppendEntries(1, "member-1", 0, 100, modification, 1);
+        return new AppendEntries(1, "member-1", 0, 100, modification, 1, -1);
     }
 }