Merge "Bug 2948: Recovered log entries not applied after prior snapshot"
authorMoiz Raja <moraja@cisco.com>
Wed, 15 Apr 2015 18:26:20 +0000 (18:26 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 15 Apr 2015 18:26:20 +0000 (18:26 +0000)
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java

index 5dc8361cc43470b72165dfd2c7b51d6372ad3438..57603a505833af58b99d2916eaa5ed2a09a065c0 100644 (file)
@@ -122,17 +122,28 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
+        long lastUnappliedIndex = context.getLastApplied() + 1;
+
         if(log.isDebugEnabled()) {
-            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getId(), context.getLastApplied() + 1, toIndex);
+            log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+                    context.getId(), lastUnappliedIndex, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
+        long lastApplied = lastUnappliedIndex - 1;
+        for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+            ReplicatedLogEntry logEntry = replicatedLog().get(i);
+            if(logEntry != null) {
+                lastApplied++;
+                batchRecoveredLogEntry(logEntry);
+            } else {
+                // Shouldn't happen but cover it anyway.
+                log.error("Log entry not found for index {}", i);
+                break;
+            }
         }
 
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
+        context.setLastApplied(lastApplied);
+        context.setCommitIndex(lastApplied);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
index 8121f75191e624cfd8595cd727985c73e1f3d5e7..f4f936bf161b2260f9cae4f397ded3af706f2294 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -19,7 +20,6 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
@@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            if(targetFollower != null){
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
-            } else {
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
             context.getActor().tell(captureSnapshot, context.getActor());
 
             return true;
@@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState {
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
@@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState {
             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            persistenceProvider.deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }
 
index daa8f7768a44033ba61585249f294fd80df00925..7c182f04e433d63c074598e5344e740f1de31d90 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
 public class CaptureSnapshot {
     private final long lastAppliedIndex;
     private final long lastAppliedTerm;
@@ -16,14 +20,17 @@ public class CaptureSnapshot {
     private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+                unAppliedEntries, false);
     }
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+            long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
@@ -31,6 +38,7 @@ public class CaptureSnapshot {
         this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
+        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
     }
 
     public long getLastAppliedIndex() {
@@ -61,6 +69,10 @@ public class CaptureSnapshot {
         return replicatedToAllTerm;
     }
 
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -68,7 +80,9 @@ public class CaptureSnapshot {
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
                 .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
-                .append(replicatedToAllTerm).append("]");
+                .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
         return builder.toString();
     }
+
+
 }
index 1289ed7f905f23919c32068a91fe0c29db0f3214..977cf0ef5eb60fc905ef7ce35177b46d23ed265b 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
@@ -20,6 +19,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
-        private volatile byte[] snapshot;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -112,19 +111,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         public void createSnapshot(ActorRef actorRef) {
-            if(snapshot != null) {
-                getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
 
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
-        }
-
-        void setSnapshot(byte[] snapshot) {
-            this.snapshot = snapshot;
-        }
-
         public ActorRef collectorActor() {
             return collectorActor;
         }
@@ -158,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long initialTerm = 5;
     protected long currentTerm;
 
+    protected List<Object> expSnapshotState = new ArrayList<>();
+
     @After
     public void tearDown() {
         InMemoryJournal.clear();
@@ -215,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
-            int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+            int lastAppliedIndex, long lastTerm, long lastIndex)
+                    throws Exception {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
         assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
-        assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+        for(int i = 0; i < expSnapshotState.size(); i++) {
+            assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+        }
     }
 
     protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
index 53110b3583a4f7353e3d17b1ec7ac91bf8ec7891..586ca8cda05fac488d448949c0840ead847432df 100644 (file)
@@ -154,10 +154,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
     }
 
-
     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
-        LOG.info("{}: applyState called", persistenceId());
+        LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+        state.add(data);
     }
 
     @Override
@@ -235,7 +236,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return this.getId();
     }
 
-    private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
index 2ced72c531f40a5d6f886189e85f20cb8afe9452..ae9c784a556496a6edb8d14b82fe9e7e315ea61e 100644 (file)
@@ -124,7 +124,7 @@ public class RaftActorSnapshotMessageSupportTest {
     @Test
     public void testOnCaptureSnapshot() throws Exception {
 
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
 
         ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
         verify(mockSnapshotManager).create(procedure.capture());
index 5062f8f6e0c6d5875e949dcc38afeda5297f5827..82ebcd1fbd5e4293aa114f6102b0eaf7a0f839fa 100644 (file)
@@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
 
+    static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
     private TestActorFactory factory;
 
     @Before
@@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest {
 
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
@@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest {
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
+            ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest {
 
             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
+
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
     }
 
     @Test
@@ -275,7 +282,7 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
         mockRaftActor.handleCommand(applySnapshot);
 
-        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
         mockRaftActor.handleCommand(captureSnapshot);
 
@@ -863,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
-    private ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
new file mode 100644 (file)
index 0000000..a8f490e
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private MockPayload payload0;
+    private MockPayload payload1;
+
+    @Before
+    public void setup() {
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    @Test
+    public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshotReply to the leader.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshotReply.class);
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+        // This is a side effect of trimming the persisted log to the sequence number captured at the time
+        // the snapshot was initiated.
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+                payload3, payload4), leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    private void reinstateLeaderActor() {
+        killActor(leaderActor);
+
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        leaderActor.underlyingActor().waitForRecoveryComplete();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    private void send2InitialPayloads() {
+        waitUntilLeader(leaderActor);
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        payload0 = sendPayloadData(leaderActor, "zero");
+        payload1 = sendPayloadData(leaderActor, "one");
+
+        // Verify the leader applies the states.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+        assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+    }
+}
index bd670fd581153e673ef145ca178211c6f7e60f86..c74705d13f6c682f30d2553665a34552f62bbc47 100644 (file)
@@ -14,7 +14,7 @@ import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     private MockPayload recoveredPayload0;
     private MockPayload recoveredPayload1;
     private MockPayload recoveredPayload2;
+    private MockPayload payload3;
     private MockPayload payload4;
     private MockPayload payload5;
     private MockPayload payload6;
     private MockPayload payload7;
 
     @Test
-    public void runTest() {
+    public void runTest() throws Exception {
         testLog.info("testReplicationAndSnapshots starting");
 
         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
@@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
         recoveredPayload2 = new MockPayload("two");
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
-        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
 
         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
 
@@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
      * scenario, the follower consensus and application of state is delayed until after the snapshot
      * completes.
+     * @throws Exception
      */
-    private void testFirstSnapshot() {
+    private void testFirstSnapshot() throws Exception {
         testLog.info("testFirstSnapshot starting");
 
-        byte[] snapshot = new byte[] {1,2,3,4};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(recoveredPayload0);
+        expSnapshotState.add(recoveredPayload1);
+        expSnapshotState.add(recoveredPayload2);
 
         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the payload.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        payload3 = sendPayloadData(leaderActor, "three");
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
@@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     /**
      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
      * consensus occurs and the leader applies the state.
+     * @throws Exception
      */
-    private void testSecondSnapshot() {
+    private void testSecondSnapshot() throws Exception {
         testLog.info("testSecondSnapshot starting");
 
-        byte[] snapshot = new byte[] {5,6,7,8};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
@@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
-        // log entry (6).
+        expSnapshotState.add(payload7);
+
+        // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+        // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+        // when the snapshot is created (ie when the CaptureSnapshot is processed).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
@@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         leaderActor.underlyingActor().waitForRecoveryComplete();
 
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
index d4a9f7701b0bec82a775f14ee439463ce2da2f78..ff9b8ce63010f02c4774b33bf6c926396d9dc240 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload0);
+        expSnapshotState.add(payload1);
+        expSnapshotState.add(payload2);
+
         testLog.info("testInitialReplications complete");
     }
 
@@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
-        leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the first payload - this should cause the first snapshot.
@@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
 
         testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
 
@@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
@@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
+        expSnapshotState.add(payload7);
+
         testLog.info("testSubsequentReplicationsAndSnapshots complete");
     }
 
@@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
                 leader.getReplicatedToAllIndex());
 
         leaderActor.underlyingActor().setMockTotalMemory(1000);
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
 
         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
@@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
+        expSnapshotState.add(payload8);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         payload9 = sendPayloadData(leaderActor, "nine", 201);
@@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
@@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+        expSnapshotState.add(payload10);
     }
 
     /**
@@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InstallSnapshot installSnapshot;
         InstallSnapshotReply installSnapshotReply;
 
-        byte[] snapshot = new byte[] {10};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload9);
 
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
-        assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+        //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
         assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
@@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
         // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
@@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
@@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+     * @throws Exception
      */
-    private void testFinalReplicationsAndSnapshot() {
+    private void testFinalReplicationsAndSnapshot() throws Exception {
         List<ApplyState> applyStates;
         ApplyState applyState;
 
         testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
 
-        byte[] snapshot = new byte[] {14};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
-
         // Send another payload - a snapshot should occur.
         payload11 = sendPayloadData(leaderActor, "eleven");
 
@@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
index 5a0d5aed741d0e9d1a943305fc24a5f94dae573a..8ab762f78612a5b540d93512128c4e820b0bbc64 100644 (file)
@@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommit(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog).snapshotCommit();
 
-        verify(mockDataPersistenceProvider).deleteMessages(100L);
+        verify(mockDataPersistenceProvider).deleteMessages(50L);
 
         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
 
@@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingCommitMultipleTimesCausesNoHarm(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
-        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
 
         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
     }
index 0737d75a7f2d679ae240c996c87cbd882052da28..d482e28401dd3ef0edf87eebb1a9ca83eafbeb7f 100644 (file)
@@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal != null) {
             synchronized (journal) {