From: Tom Pantelis Date: Sat, 4 Apr 2015 03:39:52 +0000 (-0400) Subject: Bug 2948: Recovered log entries not applied after prior snapshot X-Git-Tag: release/lithium~265^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c190bfc14468c9ad954201a53326df941161c470 Bug 2948: Recovered log entries not applied after prior snapshot Modified SnapshotManager#capture to obtain and cache the last journal sequence number. On commit, the cached sequence number is used to delete messages. This preserves any ApplyJournalEntries and ReplicatedLogEntry messages that occur after the capture is initiated. As a result of his change, we also have to obtain the unapplied entries at the time the snapahot is initiated and not when we persist. Otherwise, log entries persisted after the capture is initiated and before it's persisted would be included in the snapshot's unapplied list. On recovery, both entries would be recovered and added to the in-memory journal resulting in duplicate entries. These changes may result in a subsequent modification that is applied after the snapshot is initiated to be included in the snapshot and also persisted in the journal or included as unapplied in the snapshot. On recovery, the modification entry would be redundantly applied. This could result in data tree errors, eg if the modification was a delete, the redundant apply would cause an exception due to the non-existent node. We'll have to live with that - I think the only way to prevent it is if we create the snapshot synchronously. Note this could also have occurred before these changes. Change-Id: I745d94f5417e17627c1c4d53be8d6fdf01587d35 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index 5dc8361cc4..57603a5058 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -122,17 +122,28 @@ class RaftActorRecoverySupport { } private void onRecoveredApplyLogEntries(long toIndex) { + long lastUnappliedIndex = context.getLastApplied() + 1; + if(log.isDebugEnabled()) { - log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - context.getId(), context.getLastApplied() + 1, toIndex); + log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}", + context.getId(), lastUnappliedIndex, toIndex); } - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); + long lastApplied = lastUnappliedIndex - 1; + for (long i = lastUnappliedIndex; i <= toIndex; i++) { + ReplicatedLogEntry logEntry = replicatedLog().get(i); + if(logEntry != null) { + lastApplied++; + batchRecoveredLogEntry(logEntry); + } else { + // Shouldn't happen but cover it anyway. + log.error("Log entry not found for index {}", i); + break; + } } - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); + context.setLastApplied(lastApplied); + context.setCommitIndex(lastApplied); } private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 8121f75191..f4f936bf16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import com.google.protobuf.ByteString; +import java.util.List; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -19,7 +20,6 @@ import org.slf4j.Logger; public class SnapshotManager implements SnapshotState { - private final SnapshotState IDLE = new Idle(); private final SnapshotState CAPTURING = new Capturing(); private final SnapshotState PERSISTING = new Persisting(); @@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState { private SnapshotState currentState = IDLE; private CaptureSnapshot captureSnapshot; + private long lastSequenceNumber = -1; public SnapshotManager(RaftActorContext context, Logger logger) { this.context = context; @@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState { long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); // send a CaptureSnapshot to self to make the expensive operation async. + + List unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1); + captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, - newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null); + newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null); SnapshotManager.this.currentState = CAPTURING; - if(targetFollower != null){ - LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); - } else { + if(captureSnapshot.isInstallSnapshotInitiated()) { LOG.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); + } else { + LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot); } + lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber(); + + LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber); + context.getActor().tell(captureSnapshot, context.getActor()); return true; @@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState { // when snapshot is saved async, SaveSnapshotSuccess is raised. Snapshot sn = Snapshot.create(snapshotBytes, - context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getUnAppliedEntries(), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState { persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); - persistenceProvider.deleteMessages(sequenceNumber); + persistenceProvider.deleteMessages(lastSequenceNumber); + lastSequenceNumber = -1; SnapshotManager.this.currentState = IDLE; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java index daa8f7768a..7c182f04e4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.raft.base.messages; +import java.util.Collections; +import java.util.List; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + public class CaptureSnapshot { private final long lastAppliedIndex; private final long lastAppliedTerm; @@ -16,14 +20,17 @@ public class CaptureSnapshot { private final boolean installSnapshotInitiated; private final long replicatedToAllIndex; private final long replicatedToAllTerm; + private final List unAppliedEntries; - public CaptureSnapshot(long lastIndex, long lastTerm, - long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) { - this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false); + public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, long lastAppliedTerm, + long replicatedToAllIndex, long replicatedToAllTerm, List unAppliedEntries) { + this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex, replicatedToAllTerm, + unAppliedEntries, false); } - public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex, - long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) { + public CaptureSnapshot(long lastIndex, long lastTerm, long lastAppliedIndex, + long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, + List unAppliedEntries, boolean installSnapshotInitiated) { this.lastIndex = lastIndex; this.lastTerm = lastTerm; this.lastAppliedIndex = lastAppliedIndex; @@ -31,6 +38,7 @@ public class CaptureSnapshot { this.installSnapshotInitiated = installSnapshotInitiated; this.replicatedToAllIndex = replicatedToAllIndex; this.replicatedToAllTerm = replicatedToAllTerm; + this.unAppliedEntries = unAppliedEntries != null ? unAppliedEntries : Collections.emptyList(); } public long getLastAppliedIndex() { @@ -61,6 +69,10 @@ public class CaptureSnapshot { return replicatedToAllTerm; } + public List getUnAppliedEntries() { + return unAppliedEntries; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -68,7 +80,9 @@ public class CaptureSnapshot { .append(lastAppliedTerm).append(", lastIndex=").append(lastIndex).append(", lastTerm=") .append(lastTerm).append(", installSnapshotInitiated=").append(installSnapshotInitiated) .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", replicatedToAllTerm=") - .append(replicatedToAllTerm).append("]"); + .append(replicatedToAllTerm).append(", unAppliedEntries size=").append(unAppliedEntries.size()).append("]"); return builder.toString(); } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 1289ed7f90..977cf0ef5e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import akka.actor.ActorRef; import akka.actor.PoisonPill; @@ -20,6 +19,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -51,7 +51,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private final TestActorRef collectorActor; private final Map, Boolean> dropMessages = new ConcurrentHashMap<>(); - private volatile byte[] snapshot; private TestRaftActor(String id, Map peerAddresses, ConfigParams config, TestActorRef collectorActor) { @@ -112,19 +111,13 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest @Override public void createSnapshot(ActorRef actorRef) { - if(snapshot != null) { - getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender()); + try { + actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); + } catch (Exception e) { + e.printStackTrace(); } } - @Override - public void applyRecoverySnapshot(byte[] bytes) { - } - - void setSnapshot(byte[] snapshot) { - this.snapshot = snapshot; - } - public ActorRef collectorActor() { return collectorActor; } @@ -158,6 +151,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected long initialTerm = 5; protected long currentTerm; + protected List expSnapshotState = new ArrayList<>(); + @After public void tearDown() { InMemoryJournal.clear(); @@ -215,13 +210,20 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest }); } + @SuppressWarnings("unchecked") protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm, - int lastAppliedIndex, long lastTerm, long lastIndex, byte[] data) { + int lastAppliedIndex, long lastTerm, long lastIndex) + throws Exception { assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm()); assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex()); assertEquals(prefix + " Snapshot getLastTerm", lastTerm, snapshot.getLastTerm()); assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex()); - assertArrayEquals(prefix + " Snapshot getState", data, snapshot.getState()); + + List actualState = (List)MockRaftActor.toObject(snapshot.getState()); + assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size()); + for(int i = 0; i < expSnapshotState.size(); i++) { + assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i)); + } } protected void verifyPersistedJournal(String persistenceId, List expJournal) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 53110b3583..586ca8cda0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -154,10 +154,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier)); } - @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { actorDelegate.applyState(clientActor, identifier, data); - LOG.info("{}: applyState called", persistenceId()); + LOG.info("{}: applyState called: {}", persistenceId(), data); + + state.add(data); } @Override @@ -235,7 +236,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return this.getId(); } - private Object toObject(byte[] bs) throws ClassNotFoundException, IOException { + public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index 2ced72c531..ae9c784a55 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -124,7 +124,7 @@ public class RaftActorSnapshotMessageSupportTest { @Test public void testOnCaptureSnapshot() throws Exception { - sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1)); + sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null)); ArgumentCaptor procedure = ArgumentCaptor.forClass(Procedure.class); verify(mockSnapshotManager).create(procedure.capture()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 5062f8f6e0..82ebcd1fbd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -60,10 +60,14 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; public class RaftActorTest extends AbstractActorTest { + static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class); + private TestActorFactory factory; @Before @@ -91,6 +95,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception { + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting"); + new JavaTestKit(getSystem()) {{ String persistenceId = factory.generateActorId("follower-"); @@ -101,9 +107,9 @@ public class RaftActorTest extends AbstractActorTest { // log entry. config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + ImmutableMap peerAddresses = ImmutableMap.builder().put("member1", "address").build(); ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId, - ImmutableMap.builder().put("member1", "address").build(), - Optional.of(config)), persistenceId); + peerAddresses, Optional.of(config)), persistenceId); watch(followerActor); @@ -156,8 +162,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = factory.createTestActor( - MockRaftActor.props(persistenceId, Collections.emptyMap(), - Optional.of(config))); + MockRaftActor.props(persistenceId, peerAddresses, Optional.of(config))); MockRaftActor mockRaftActor = ref.underlyingActor(); @@ -176,6 +181,8 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState()); }}; + + TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending"); } @Test @@ -275,7 +282,7 @@ public class RaftActorTest extends AbstractActorTest { doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot)); mockRaftActor.handleCommand(applySnapshot); - CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1); + CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null); doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot)); mockRaftActor.handleCommand(captureSnapshot); @@ -863,7 +870,7 @@ public class RaftActorTest extends AbstractActorTest { }}; } - private ByteString fromObject(Object snapshot) throws Exception { + public static ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java new file mode 100644 index 0000000000..a8f490e751 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import akka.persistence.SaveSnapshotSuccess; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; + +/** + * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication. + * + * @author Thomas Pantelis + */ +public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { + + private MockPayload payload0; + private MockPayload payload1; + + @Before + public void setup() { + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + + peerAddresses = ImmutableMap.builder(). + put(follower1Id, follower1Actor.path().toString()).build(); + + leaderConfigParams = newLeaderConfigParams(); + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); + leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + } + + @Test + public void testStatePersistedBetweenSnapshotCaptureAndPersist() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshot.class); + + // First, deliver the CaptureSnapshot to the leader. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); + leaderActor.tell(captureSnapshot, leaderActor); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + // Now deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + // Now deliver the CaptureSnapshotReply to the leader. + CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshotReply.class); + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class); + leaderActor.tell(captureSnapshotReply, leaderActor); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), + leaderActor.underlyingActor().getState()); + } + + @Test + public void testStatePersistedBetweenInitiateSnapshotAndCapture() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshot.class); + + // First, deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + // Now deliver the CaptureSnapshot to the leader. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); + leaderActor.tell(captureSnapshot, leaderActor); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so + // were included in the snapshot. They were also included as unapplied entries in the snapshot as + // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the + // state on recovery by the ApplyJournalEntries messages which remained in the persisted log. + // This is a side effect of trimming the persisted log to the sequence number captured at the time + // the snapshot was initiated. + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2, + payload3, payload4), leaderActor.underlyingActor().getState()); + } + + @Test + public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() { + + send2InitialPayloads(); + + // Block these messages initially so we can control the sequence. + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + // This should trigger a snapshot. + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + + // Send another payload. + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + + MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + // Now deliver the AppendEntries to the follower + follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + + reinstateLeaderActor(); + + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), + leaderActor.underlyingActor().getState()); + } + + private void reinstateLeaderActor() { + killActor(leaderActor); + + leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + + leaderActor.underlyingActor().waitForRecoveryComplete(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + } + + private void send2InitialPayloads() { + waitUntilLeader(leaderActor); + currentTerm = leaderContext.getTermInformation().getCurrentTerm(); + + payload0 = sendPayloadData(leaderActor, "zero"); + payload1 = sendPayloadData(leaderActor, "one"); + + // Verify the leader applies the states. + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2); + + assertEquals("Leader last applied", 1, leaderContext.getLastApplied()); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.clearMessages(follower1CollectorActor); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index bd670fd581..c74705d13f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -14,7 +14,7 @@ import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt private MockPayload recoveredPayload0; private MockPayload recoveredPayload1; private MockPayload recoveredPayload2; + private MockPayload payload3; private MockPayload payload4; private MockPayload payload5; private MockPayload payload6; private MockPayload payload7; @Test - public void runTest() { + public void runTest() throws Exception { testLog.info("testReplicationAndSnapshots starting"); // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less @@ -55,7 +56,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1)); recoveredPayload2 = new MockPayload("two"); InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2)); - InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2)); + InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2)); origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class); @@ -157,19 +158,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this * scenario, the follower consensus and application of state is delayed until after the snapshot * completes. + * @throws Exception */ - private void testFirstSnapshot() { + private void testFirstSnapshot() throws Exception { testLog.info("testFirstSnapshot starting"); - byte[] snapshot = new byte[] {1,2,3,4}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(recoveredPayload0); + expSnapshotState.add(recoveredPayload1); + expSnapshotState.add(recoveredPayload2); // Delay the consensus by temporarily dropping the AppendEntries to both followers. follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the payload. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + payload3 = sendPayloadData(leaderActor, "three"); // Wait for snapshot complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -185,7 +188,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3); @@ -286,12 +289,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt /** * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until * consensus occurs and the leader applies the state. + * @throws Exception */ - private void testSecondSnapshot() { + private void testSecondSnapshot() throws Exception { testLog.info("testSecondSnapshot starting"); - byte[] snapshot = new byte[] {5,6,7,8}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); // Delay the CaptureSnapshot message to the leader actor. leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); @@ -341,11 +347,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 7, leaderContext.getCommitIndex()); - // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied - // log entry (6). + expSnapshotState.add(payload7); + + // Verify the persisted snapshot. This should reflect the snapshot index as the last applied + // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data + // when the snapshot is created (ie when the CaptureSnapshot is processed). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7); @@ -404,6 +413,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt leaderActor.underlyingActor().waitForRecoveryComplete(); + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex()); assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index d4a9f7701b..ff9b8ce630 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; @@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload0); + expSnapshotState.add(payload1); + expSnapshotState.add(payload2); + testLog.info("testInitialReplications complete"); } @@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - leaderActor.underlyingActor().setSnapshot(new byte[] {2}); - follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the first payload - this should cause the first snapshot. @@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads"); @@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); @@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); + expSnapshotState.add(payload7); + testLog.info("testSubsequentReplicationsAndSnapshots complete"); } @@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A leader.getReplicatedToAllIndex()); leaderActor.underlyingActor().setMockTotalMemory(1000); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal. InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2); @@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class); Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot); + expSnapshotState.add(payload8); + // Send another payload with a large enough relative size in combination with the last payload // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot. payload9 = sendPayloadData(leaderActor, "nine", 201); @@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9); @@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + + expSnapshotState.add(payload10); } /** @@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InstallSnapshot installSnapshot; InstallSnapshotReply installSnapshotReply; - byte[] snapshot = new byte[] {10}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload9); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); @@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks()); assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm()); assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex()); - assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); + //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class); assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm()); @@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2 applies the snapshot. applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); - verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot); + verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size()); // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot. @@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9); unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size()); @@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A /** * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower. + * @throws Exception */ - private void testFinalReplicationsAndSnapshot() { + private void testFinalReplicationsAndSnapshot() throws Exception { List applyStates; ApplyState applyState; testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - byte[] snapshot = new byte[] {14}; - leaderActor.underlyingActor().setSnapshot(snapshot); - // Send another payload - a snapshot should occur. payload11 = sendPayloadData(leaderActor, "eleven"); @@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's last persisted snapshot (previous ones may not be purged yet). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11); List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 5a0d5aed74..8ab762f786 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -69,6 +69,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); doReturn("123").when(mockRaftActorContext).getId(); + doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider(); doReturn("123").when(mockRaftActorBehavior).getLeaderId(); ElectionTerm mockElectionTerm = mock(ElectionTerm.class); @@ -384,6 +385,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCommit(){ + doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); + // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); @@ -397,7 +400,7 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockReplicatedLog).snapshotCommit(); - verify(mockDataPersistenceProvider).deleteMessages(100L); + verify(mockDataPersistenceProvider).deleteMessages(50L); ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); @@ -438,6 +441,8 @@ public class SnapshotManagerTest extends AbstractActorTest { @Test public void testCallingCommitMultipleTimesCausesNoHarm(){ + doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); + // when replicatedToAllIndex = -1 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); @@ -453,7 +458,7 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockReplicatedLog, times(1)).snapshotCommit(); - verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L); + verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L); verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class)); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java index 0737d75a7f..d482e28401 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java @@ -216,6 +216,7 @@ public class InMemoryJournal extends AsyncWriteJournal { @Override public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr); Map journal = journals.get(persistenceId); if(journal != null) { synchronized (journal) {