Bug 2948: Recovered log entries not applied after prior snapshot 13/18113/5
authorTom Pantelis <tpanteli@brocade.com>
Sat, 4 Apr 2015 03:39:52 +0000 (23:39 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 14 Apr 2015 00:51:14 +0000 (20:51 -0400)
Modified SnapshotManager#capture to obtain and cache the last journal sequence
number. On commit, the cached sequence number is used to delete
messages. This preserves any ApplyJournalEntries and ReplicatedLogEntry
messages that occur after the capture is initiated.

As a result of his change, we also have to obtain the unapplied entries
at the time the snapahot is initiated and not when we persist.
Otherwise, log entries persisted after the capture is initiated and
before it's persisted would be included in the snapshot's unapplied
list. On recovery, both entries would be recovered and added to the in-memory
journal resulting in duplicate entries.

These changes may result in a subsequent modification that is applied
after the snapshot is initiated to be included in the snapshot and also
persisted in the journal or included as unapplied in the snapshot. On
recovery, the modification entry would be redundantly applied. This could
result in data tree errors, eg if the modification was a delete, the
redundant apply would cause an exception due to the non-existent node.
We'll have to live with that - I think the only way to prevent it is if
we create the snapshot synchronously. Note this could also have occurred
before these changes.

Change-Id: I745d94f5417e17627c1c4d53be8d6fdf01587d35
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java

index 5dc8361..57603a5 100644 (file)
@@ -122,17 +122,28 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
+        long lastUnappliedIndex = context.getLastApplied() + 1;
+
         if(log.isDebugEnabled()) {
-            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    context.getId(), context.getLastApplied() + 1, toIndex);
+            log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
+                    context.getId(), lastUnappliedIndex, toIndex);
         }
 
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
+        long lastApplied = lastUnappliedIndex - 1;
+        for (long i = lastUnappliedIndex; i <= toIndex; i++) {
+            ReplicatedLogEntry logEntry = replicatedLog().get(i);
+            if(logEntry != null) {
+                lastApplied++;
+                batchRecoveredLogEntry(logEntry);
+            } else {
+                // Shouldn't happen but cover it anyway.
+                log.error("Log entry not found for index {}", i);
+                break;
+            }
         }
 
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
+        context.setLastApplied(lastApplied);
+        context.setCommitIndex(lastApplied);
     }
 
     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
index 8121f75..f4f936b 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -19,7 +20,6 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
@@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            if(targetFollower != null){
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
-            } else {
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
             context.getActor().tell(captureSnapshot, context.getActor());
 
             return true;
@@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState {
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
@@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState {
             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            persistenceProvider.deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }
 
index daa8f77..7c182f0 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
 public class CaptureSnapshot {
     private final long lastAppliedIndex;
     private final long lastAppliedTerm;
@@ -16,14 +20,17 @@ public class CaptureSnapshot {
     private final boolean installSnapshotInitiated;
     private final long replicatedToAllIndex;
     private final long replicatedToAllTerm;
+    private final List<ReplicatedLogEntry> unAppliedEntries;
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm,
+            long replicatedToAllIndex, long replicatedToAllTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm,
+                unAppliedEntries, false);
     }
 
-    public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
+    public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex,
+            long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
@@ -31,6 +38,7 @@ public class CaptureSnapshot {
         this.installSnapshotInitiated = installSnapshotInitiated;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.replicatedToAllTerm = replicatedToAllTerm;
+        this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.<ReplicatedLogEntry>emptyList();
     }
 
     public long getLastAppliedIndex() {
@@ -61,6 +69,10 @@ public class CaptureSnapshot {
         return replicatedToAllTerm;
     }
 
+    public List<ReplicatedLogEntry> getUnAppliedEntries() {
+        return unAppliedEntries;
+    }
+
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -68,7 +80,9 @@ public class CaptureSnapshot {
                 .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=")
                 .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated)
                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=")
-                .append(replicatedToAllTerm).append("]");
+                .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]");
         return builder.toString();
     }
+
+
 }
index 1289ed7..977cf0e 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
@@ -20,6 +19,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
-        private volatile byte[] snapshot;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -112,19 +111,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
 
         @Override
         public void createSnapshot(ActorRef actorRef) {
-            if(snapshot != null) {
-                getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
 
-        @Override
-        public void applyRecoverySnapshot(byte[] bytes) {
-        }
-
-        void setSnapshot(byte[] snapshot) {
-            this.snapshot = snapshot;
-        }
-
         public ActorRef collectorActor() {
             return collectorActor;
         }
@@ -158,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected long initialTerm = 5;
     protected long currentTerm;
 
+    protected List<Object> expSnapshotState = new ArrayList<>();
+
     @After
     public void tearDown() {
         InMemoryJournal.clear();
@@ -215,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         });
     }
 
+    @SuppressWarnings("unchecked")
     protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm,
-            int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) {
+            int lastAppliedIndex, long lastTerm, long lastIndex)
+                    throws Exception {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
         assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex());
         assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm());
         assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex());
-        assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState());
+
+        List<Object> actualState = (List<Object>)MockRaftActor.toObject(snapshot.getState());
+        assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size());
+        for(int i = 0; i < expSnapshotState.size(); i++) {
+            assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i));
+        }
     }
 
     protected void verifyPersistedJournal(String persistenceId, List<? extends ReplicatedLogEntry> expJournal) {
index 53110b3..586ca8c 100644 (file)
@@ -154,10 +154,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
     }
 
-
     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
-        LOG.info("{}: applyState called", persistenceId());
+        LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+        state.add(data);
     }
 
     @Override
@@ -235,7 +236,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return this.getId();
     }
 
-    private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+    public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
index 2ced72c..ae9c784 100644 (file)
@@ -124,7 +124,7 @@ public class RaftActorSnapshotMessageSupportTest {
     @Test
     public void testOnCaptureSnapshot() throws Exception {
 
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1));
+        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
 
         ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
         verify(mockSnapshotManager).create(procedure.capture());
index 5062f8f..82ebcd1 100644 (file)
@@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
 
+    static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
+
     private TestActorFactory factory;
 
     @Before
@@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest {
 
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
+
         new JavaTestKit(getSystem()) {{
             String persistenceId = factory.generateActorId("follower-");
 
@@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest {
             // log entry.
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
+            ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
 
             watch(followerActor);
 
@@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
-                            Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest {
 
             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
         }};
+
+        TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
     }
 
     @Test
@@ -275,7 +282,7 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
         mockRaftActor.handleCommand(applySnapshot);
 
-        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
+        CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
         mockRaftActor.handleCommand(captureSnapshot);
 
@@ -863,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
-    private ByteString fromObject(Object snapshot) throws Exception {
+    public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
new file mode 100644 (file)
index 0000000..a8f490e
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
+ *
+ * @author Thomas Pantelis
+ */
+public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+    private MockPayload payload0;
+    private MockPayload payload1;
+
+    @Before
+    public void setup() {
+        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    @Test
+    public void testStatePersistedBetweenSnapshotCaptureAndPersist() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshotReply to the leader.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshotReply.class);
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
+                leaderCollectorActor, CaptureSnapshot.class);
+
+        // First, deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        // Now deliver the CaptureSnapshot to the leader.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
+        leaderActor.tell(captureSnapshot, leaderActor);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
+        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
+        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
+        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
+        // This is a side effect of trimming the persisted log to the sequence number captured at the time
+        // the snapshot was initiated.
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
+                payload3, payload4), leaderActor.underlyingActor().getState());
+    }
+
+    @Test
+    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+
+        send2InitialPayloads();
+
+        // Block these messages initially so we can control the sequence.
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+
+        // Send another payload.
+        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+
+        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Now deliver the AppendEntries to the follower
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
+        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
+    }
+
+    private void reinstateLeaderActor() {
+        killActor(leaderActor);
+
+        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+
+        leaderActor.underlyingActor().waitForRecoveryComplete();
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+    }
+
+    private void send2InitialPayloads() {
+        waitUntilLeader(leaderActor);
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        payload0 = sendPayloadData(leaderActor, "zero");
+        payload1 = sendPayloadData(leaderActor, "one");
+
+        // Verify the leader applies the states.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+        assertEquals("Leader last applied", 1, leaderContext.getLastApplied());
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+    }
+}
index bd670fd..c74705d 100644 (file)
@@ -14,7 +14,7 @@ import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     private MockPayload recoveredPayload0;
     private MockPayload recoveredPayload1;
     private MockPayload recoveredPayload2;
+    private MockPayload payload3;
     private MockPayload payload4;
     private MockPayload payload5;
     private MockPayload payload6;
     private MockPayload payload7;
 
     @Test
-    public void runTest() {
+    public void runTest() throws Exception {
         testLog.info("testReplicationAndSnapshots starting");
 
         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
@@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
         recoveredPayload2 = new MockPayload("two");
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
-        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
 
         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
 
@@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
      * scenario, the follower consensus and application of state is delayed until after the snapshot
      * completes.
+     * @throws Exception
      */
-    private void testFirstSnapshot() {
+    private void testFirstSnapshot() throws Exception {
         testLog.info("testFirstSnapshot starting");
 
-        byte[] snapshot = new byte[] {1,2,3,4};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(recoveredPayload0);
+        expSnapshotState.add(recoveredPayload1);
+        expSnapshotState.add(recoveredPayload2);
 
         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the payload.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        payload3 = sendPayloadData(leaderActor, "three");
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
@@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     /**
      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
      * consensus occurs and the leader applies the state.
+     * @throws Exception
      */
-    private void testSecondSnapshot() {
+    private void testSecondSnapshot() throws Exception {
         testLog.info("testSecondSnapshot starting");
 
-        byte[] snapshot = new byte[] {5,6,7,8};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
@@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
-        // log entry (6).
+        expSnapshotState.add(payload7);
+
+        // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+        // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+        // when the snapshot is created (ie when the CaptureSnapshot is processed).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
@@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         leaderActor.underlyingActor().waitForRecoveryComplete();
 
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());
index d4a9f77..ff9b8ce 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload0);
+        expSnapshotState.add(payload1);
+        expSnapshotState.add(payload2);
+
         testLog.info("testInitialReplications complete");
     }
 
@@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
-        leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the first payload - this should cause the first snapshot.
@@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
 
         testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
 
@@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
@@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
+        expSnapshotState.add(payload7);
+
         testLog.info("testSubsequentReplicationsAndSnapshots complete");
     }
 
@@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
                 leader.getReplicatedToAllIndex());
 
         leaderActor.underlyingActor().setMockTotalMemory(1000);
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
 
         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
@@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
+        expSnapshotState.add(payload8);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         payload9 = sendPayloadData(leaderActor, "nine", 201);
@@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
@@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+        expSnapshotState.add(payload10);
     }
 
     /**
@@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InstallSnapshot installSnapshot;
         InstallSnapshotReply installSnapshotReply;
 
-        byte[] snapshot = new byte[] {10};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload9);
 
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
-        assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+        //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
         assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
@@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
         // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
@@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
@@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+     * @throws Exception
      */
-    private void testFinalReplicationsAndSnapshot() {
+    private void testFinalReplicationsAndSnapshot() throws Exception {
         List<ApplyState> applyStates;
         ApplyState applyState;
 
         testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
 
-        byte[] snapshot = new byte[] {14};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
-
         // Send another payload - a snapshot should occur.
         payload11 = sendPayloadData(leaderActor, "eleven");
 
@@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);
index 5a0d5ae..8ab762f 100644 (file)
@@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCommit(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog).snapshotCommit();
 
-        verify(mockDataPersistenceProvider).deleteMessages(100L);
+        verify(mockDataPersistenceProvider).deleteMessages(50L);
 
         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
 
@@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingCommitMultipleTimesCausesNoHarm(){
+        doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
@@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
-        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
 
         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
     }
index 0737d75..d482e28 100644 (file)
@@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+        LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal != null) {
             synchronized (journal) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.