From 4a60a637941ccf77ab7b32484cbc4128eaf3ea7c Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 8 Apr 2015 03:32:15 -0400 Subject: [PATCH] Bug 2948: Perform create snapshot synchronously Performing the create snapshot phase asynchronously via sending the CaptureSnapshot message may result in subsequent modifications that are applied after the CaptureSnapshot message is sent but before it's received to be included in the snapshot and also included as unapplied entries in the snapshot. On recovery, the modifications would be redundantly applied. This could result in data tree errors, eg if a modification was a delete, the redundant apply would cause an exception due to the non-existent node. To avoid this, the SnapshotManager was changed to immediately call the create procedure in the capture method instead of sending the CaptureSnapshot message. The create procedure is now set in the SnapshotManager by the RaftActorSnapshotMessageSupport. The Capturing state was removed. Change-Id: I0efe8966d83e019c9dd8a82799193ea7ac49ba65 Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 6 +- .../raft/RaftActorSnapshotMessageSupport.java | 12 +- .../cluster/raft/SnapshotManager.java | 57 +++------ .../cluster/raft/SnapshotState.java | 8 -- .../cluster/raft/MockRaftActorContext.java | 1 + .../RaftActorSnapshotMessageSupportTest.java | 17 --- .../cluster/raft/RecoveryIntegrationTest.java | 64 +--------- ...eplicationAndSnapshotsIntegrationTest.java | 22 ++-- .../cluster/raft/SnapshotManagerTest.java | 110 +++++------------- .../cluster/raft/behaviors/LeaderTest.java | 9 +- 10 files changed, 67 insertions(+), 239 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 157a53ed2d..1101af8442 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -136,6 +136,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getConfigParams().getJournalRecoveryLogBatchSize()); super.preStart(); + + snapshotSupport = newRaftActorSnapshotMessageSupport(); } @Override @@ -193,10 +195,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleCommand(Object message) { - if(snapshotSupport == null) { - snapshotSupport = newRaftActorSnapshotMessageSupport(); - } - boolean handled = snapshotSupport.handleSnapshotMessage(message); if(handled) { return; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 790ff89510..6860b791d3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -12,7 +12,6 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; @@ -45,6 +44,8 @@ class RaftActorSnapshotMessageSupport { this.currentBehavior = currentBehavior; this.cohort = cohort; this.log = context.getLogger(); + + context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure); } boolean handleSnapshotMessage(Object message) { @@ -57,9 +58,6 @@ class RaftActorSnapshotMessageSupport { } else if (message instanceof SaveSnapshotFailure) { onSaveSnapshotFailure((SaveSnapshotFailure) message); return true; - } else if (message instanceof CaptureSnapshot) { - onCaptureSnapshot(message); - return true; } else if (message instanceof CaptureSnapshotReply) { onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); return true; @@ -77,12 +75,6 @@ class RaftActorSnapshotMessageSupport { context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory()); } - private void onCaptureSnapshot(Object message) { - log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message); - - context.getSnapshotManager().create(createSnapshotProcedure); - } - private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) { log.error("{}: SaveSnapshotFailure received for snapshot Cause:", context.getId(), saveSnapshotFailure.cause()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index f4f936bf16..5b0ebcddee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import java.util.List; import org.opendaylight.controller.cluster.DataPersistenceProvider; @@ -21,7 +22,6 @@ import org.slf4j.Logger; public class SnapshotManager implements SnapshotState { private final SnapshotState IDLE = new Idle(); - private final SnapshotState CAPTURING = new Capturing(); private final SnapshotState PERSISTING = new Persisting(); private final SnapshotState CREATING = new Creating(); @@ -37,6 +37,8 @@ public class SnapshotManager implements SnapshotState { private CaptureSnapshot captureSnapshot; private long lastSequenceNumber = -1; + private Procedure createSnapshotProcedure; + public SnapshotManager(RaftActorContext context, Logger logger) { this.context = context; this.LOG = logger; @@ -57,11 +59,6 @@ public class SnapshotManager implements SnapshotState { return currentState.capture(lastLogEntry, replicatedToAllIndex); } - @Override - public void create(Procedure callback) { - currentState.create(callback); - } - @Override public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { @@ -83,6 +80,15 @@ public class SnapshotManager implements SnapshotState { return currentState.trimLog(desiredTrimIndex, currentBehavior); } + public void setCreateSnapshotCallable(Procedure createSnapshotProcedure) { + this.createSnapshotProcedure = createSnapshotProcedure; + } + + @VisibleForTesting + public CaptureSnapshot getCaptureSnapshot() { + return captureSnapshot; + } + private boolean hasFollowers(){ return context.getPeerAddresses().keySet().size() > 0; } @@ -110,11 +116,6 @@ public class SnapshotManager implements SnapshotState { return false; } - @Override - public void create(Procedure callback) { - LOG.debug("create should not be called in state {}", this); - } - @Override public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { @@ -192,8 +193,6 @@ public class SnapshotManager implements SnapshotState { lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null); - SnapshotManager.this.currentState = CAPTURING; - if(captureSnapshot.isInstallSnapshotInitiated()) { LOG.info("{}: Initiating snapshot capture {} to install on {}", persistenceId(), captureSnapshot, targetFollower); @@ -205,8 +204,14 @@ public class SnapshotManager implements SnapshotState { LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber); - context.getActor().tell(captureSnapshot, context.getActor()); + try { + createSnapshotProcedure.apply(null); + } catch (Exception e) { + LOG.error("Error creating snapshot", e); + return false; + } + SnapshotManager.this.currentState = CREATING; return true; } @@ -231,30 +236,6 @@ public class SnapshotManager implements SnapshotState { } } - private class Capturing extends AbstractSnapshotState { - - @Override - public boolean isCapturing() { - return true; - } - - @Override - public void create(Procedure callback) { - try { - callback.apply(null); - SnapshotManager.this.currentState = CREATING; - } catch (Exception e) { - LOG.error("Unexpected error occurred", e); - } - } - - @Override - public String toString() { - return "Capturing"; - } - - } - private class Creating extends AbstractSnapshotState { @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java index 9a9bf1c774..b0cbf4b00f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.raft; -import akka.japi.Procedure; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -40,13 +39,6 @@ public interface SnapshotState { */ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower); - /** - * Create the snapshot - * - * @param callback a procedure to be called which should create the snapshot - */ - void create(Procedure callback); - /** * Persist the snapshot * diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index adf7778fe7..4aa3b2fb4e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -200,6 +200,7 @@ public class MockRaftActorContext implements RaftActorContext { public SnapshotManager getSnapshotManager() { if(this.snapshotManager == null){ this.snapshotManager = new SnapshotManager(this, getLogger()); + this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.instance()); } return this.snapshotManager; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java index ae9c784a55..943f653641 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java @@ -13,7 +13,6 @@ import static org.mockito.Matchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import akka.actor.ActorRef; -import akka.japi.Procedure; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotMetadata; @@ -21,14 +20,12 @@ import java.util.Arrays; import java.util.Collections; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; @@ -120,20 +117,6 @@ public class RaftActorSnapshotMessageSupportTest { verify(mockCohort).applySnapshot(snapshotBytes); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testOnCaptureSnapshot() throws Exception { - - sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null)); - - ArgumentCaptor procedure = ArgumentCaptor.forClass(Procedure.class); - verify(mockSnapshotManager).create(procedure.capture()); - - procedure.getValue().apply(null); - - verify(mockCohort).createSnapshot(same(mockRaftActorRef)); - } - @Test public void testOnCaptureSnapshotReply() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java index a8f490e751..33e5c4bcdf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -15,7 +15,6 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -53,7 +52,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { send2InitialPayloads(); // Block these messages initially so we can control the sequence. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); @@ -64,13 +62,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); - - // First, deliver the CaptureSnapshot to the leader. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - // Send another payload. MockPayload payload4 = sendPayloadData(leaderActor, "four"); @@ -102,60 +93,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { } @Test - public void testStatePersistedBetweenInitiateSnapshotAndCapture() { - - send2InitialPayloads(); - - // Block these messages initially so we can control the sequence. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); - follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); - - MockPayload payload2 = sendPayloadData(leaderActor, "two"); - - // This should trigger a snapshot. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); - - // Send another payload. - MockPayload payload4 = sendPayloadData(leaderActor, "four"); - - MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); - - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); - - // First, deliver the AppendEntries to the follower - follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); - - MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); - - // Now deliver the CaptureSnapshot to the leader. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - - // Wait for snapshot complete. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - - reinstateLeaderActor(); - - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); - - // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so - // were included in the snapshot. They were also included as unapplied entries in the snapshot as - // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the - // state on recovery by the ApplyJournalEntries messages which remained in the persisted log. - // This is a side effect of trimming the persisted log to the sequence number captured at the time - // the snapshot was initiated. - assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2, - payload3, payload4), leaderActor.underlyingActor().getState()); - } - - @Test - public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() { + public void testStatePersistedAfterSnapshotPersisted() { send2InitialPayloads(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index c74705d13f..be748f3d26 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -16,7 +16,6 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -300,14 +299,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt expSnapshotState.add(payload6); // Delay the CaptureSnapshot message to the leader actor. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); // Send the payload. payload7 = sendPayloadData(leaderActor, "seven"); - // Capture the CaptureSnapshot message so we can send it later. - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); + // Capture the CaptureSnapshotReply message so we can send it later. + CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, + CaptureSnapshotReply.class); // Wait for the state to be applied in the leader. ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); @@ -327,12 +326,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader last applied", 7, leaderContext.getLastApplied()); assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex()); - // Now deliver the CaptureSnapshot. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - - // Wait for CaptureSnapshotReply to complete. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class); + // Now deliver the CaptureSnapshotReply. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class); + leaderActor.tell(captureSnapshotReply, leaderActor); // Wait for snapshot complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -347,8 +343,6 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 7, leaderContext.getCommitIndex()); - expSnapshotState.add(payload7); - // Verify the persisted snapshot. This should reflect the snapshot index as the last applied // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data // when the snapshot is created (ie when the CaptureSnapshot is processed). @@ -398,6 +392,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex()); assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex()); + expSnapshotState.add(payload7); + testLog.info("testSecondSnapshot ending"); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 8ab762f786..47720d95b6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -7,6 +7,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -19,7 +20,6 @@ import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -82,6 +82,7 @@ public class SnapshotManagerTest extends AbstractActorTest { actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); doReturn(actorRef).when(mockRaftActorContext).getActor(); + snapshotManager.setCreateSnapshotCallable(mockProcedure); } @After @@ -95,7 +96,7 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testCaptureToInstall(){ + public void testCaptureToInstall() throws Exception { // Force capturing toInstall = true snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, @@ -103,7 +104,9 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(true, snapshotManager.isCapturing()); - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + verify(mockProcedure).apply(null); + + CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); // LastIndex and LastTerm are picked up from the lastLogEntry assertEquals(0L, captureSnapshot.getLastIndex()); @@ -120,7 +123,7 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testCapture(){ + public void testCapture() throws Exception { boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, new MockRaftActorContext.MockPayload()), 9); @@ -128,7 +131,10 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(true, snapshotManager.isCapturing()); - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + verify(mockProcedure).apply(null); + + CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); + // LastIndex and LastTerm are picked up from the lastLogEntry assertEquals(9L, captureSnapshot.getLastIndex()); assertEquals(1L, captureSnapshot.getLastTerm()); @@ -145,6 +151,20 @@ public class SnapshotManagerTest extends AbstractActorTest { } + @Test + public void testCaptureWithCreateProcedureError () throws Exception { + doThrow(new Exception("mock")).when(mockProcedure).apply(null); + + boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertFalse(capture); + + assertEquals(false, snapshotManager.isCapturing()); + + verify(mockProcedure).apply(null); + } + @Test public void testIllegalCapture() throws Exception { boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, @@ -152,9 +172,9 @@ public class SnapshotManagerTest extends AbstractActorTest { assertTrue(capture); - List allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + verify(mockProcedure).apply(null); - assertEquals(1, allMatching.size()); + reset(mockProcedure); // This will not cause snapshot capture to start again capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, @@ -162,9 +182,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertFalse(capture); - allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); - - assertEquals(1, allMatching.size()); + verify(mockProcedure, never()).apply(null); } @Test @@ -188,8 +206,6 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex = -1 snapshotManager.capture(lastLogEntry, -1); - snapshotManager.create(mockProcedure); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -209,60 +225,6 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockReplicatedLog).snapshotPreCommit(7L, 1L); } - - @Test - public void testCreate() throws Exception { - // when replicatedToAllIndex = -1 - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, - new MockRaftActorContext.MockPayload()), -1); - - snapshotManager.create(mockProcedure); - - verify(mockProcedure).apply(null); - - assertEquals("isCapturing", true, snapshotManager.isCapturing()); - } - - @Test - public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception { - // when replicatedToAllIndex = -1 - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, - new MockRaftActorContext.MockPayload()), -1); - - snapshotManager.create(mockProcedure); - - snapshotManager.create(mockProcedure); - - verify(mockProcedure, times(1)).apply(null); - } - - @Test - public void testCallingCreateBeforeCapture() throws Exception { - snapshotManager.create(mockProcedure); - - verify(mockProcedure, times(0)).apply(null); - } - - @Test - public void testCallingCreateAfterPersist() throws Exception { - // when replicatedToAllIndex = -1 - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, - new MockRaftActorContext.MockPayload()), -1); - - snapshotManager.create(mockProcedure); - - verify(mockProcedure, times(1)).apply(null); - - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior - , Runtime.getRuntime().totalMemory()); - - reset(mockProcedure); - - snapshotManager.create(mockProcedure); - - verify(mockProcedure, never()).apply(null); - } - @Test public void testPersistWhenReplicatedToAllIndexNotMinus(){ doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); @@ -276,8 +238,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, new MockRaftActorContext.MockPayload()), 9); - snapshotManager.create(mockProcedure); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -308,8 +268,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -328,8 +286,6 @@ public class SnapshotManagerTest extends AbstractActorTest { assertTrue(capture); - snapshotManager.create(mockProcedure); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior @@ -368,8 +324,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -391,8 +345,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -447,8 +399,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -469,8 +419,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); @@ -504,8 +452,6 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, new MockRaftActorContext.MockPayload()), -1, "follower-1"); - snapshotManager.create(mockProcedure); - snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior , Runtime.getRuntime().totalMemory()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index ba0bd0f29c..0255020328 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -536,7 +536,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); } @Test @@ -584,7 +584,9 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); @@ -595,8 +597,7 @@ public class LeaderTest extends AbstractLeaderTest { // if an initiate is started again when first is in progress, it shouldnt initiate Capture leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test -- 2.36.6