Bug 2948: Perform create snapshot synchronously 14/18114/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 8 Apr 2015 07:32:15 +0000 (03:32 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 14 Apr 2015 00:53:15 +0000 (20:53 -0400)
Performing the create snapshot phase asynchronously via sending
the CaptureSnapshot message may result in subsequent modifications
that are applied after the CaptureSnapshot message is sent but before
it's received to be included in the snapshot and also included as
unapplied entries in the snapshot. On recovery, the modifications
would be redundantly applied. This could result in data tree errors,
eg if a modification was a delete, the redundant apply would cause an
exception due to the non-existent node.

To avoid this, the SnapshotManager was changed to immediately call the
create procedure in the capture method instead of sending the
CaptureSnapshot message. The create procedure is now set in the
SnapshotManager by the RaftActorSnapshotMessageSupport. The Capturing
state was removed.

Change-Id: I0efe8966d83e019c9dd8a82799193ea7ac49ba65
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 157a53ed2d1797b662f6fadccd3acf4027ae442f..1101af8442e656d749acd379bdda0b298058685c 100644 (file)
@@ -136,6 +136,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
 
         super.preStart();
+
+        snapshotSupport = newRaftActorSnapshotMessageSupport();
     }
 
     @Override
@@ -193,10 +195,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleCommand(Object message) {
-        if(snapshotSupport == null) {
-            snapshotSupport = newRaftActorSnapshotMessageSupport();
-        }
-
         boolean handled = snapshotSupport.handleSnapshotMessage(message);
         if(handled) {
             return;
index 790ff89510ab222473a68b8105eab31c32158dd2..6860b791d37bf6b54b3a42781f2c1062fc64a8b6 100644 (file)
@@ -12,7 +12,6 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -45,6 +44,8 @@ class RaftActorSnapshotMessageSupport {
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
+
+        context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
     }
 
     boolean handleSnapshotMessage(Object message) {
@@ -57,9 +58,6 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof SaveSnapshotFailure) {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
             return true;
-        } else if (message instanceof CaptureSnapshot) {
-            onCaptureSnapshot(message);
-            return true;
         } else if (message instanceof CaptureSnapshotReply) {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
             return true;
@@ -77,12 +75,6 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
-    private void onCaptureSnapshot(Object message) {
-        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
-
-        context.getSnapshotManager().create(createSnapshotProcedure);
-    }
-
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                 context.getId(), saveSnapshotFailure.cause());
index f4f936bf161b2260f9cae4f397ded3af706f2294..5b0ebcddee9fac1c0a9a7bdf8df31917c8eb2f3b 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -21,7 +22,6 @@ import org.slf4j.Logger;
 public class SnapshotManager implements SnapshotState {
 
     private final SnapshotState IDLE = new Idle();
-    private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
@@ -37,6 +37,8 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
+    private Procedure<Void> createSnapshotProcedure;
+
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
@@ -57,11 +59,6 @@ public class SnapshotManager implements SnapshotState {
         return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
-    @Override
-    public void create(Procedure<Void> callback) {
-        currentState.create(callback);
-    }
-
     @Override
     public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
                         RaftActorBehavior currentBehavior, long totalMemory) {
@@ -83,6 +80,15 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
+    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
+    @VisibleForTesting
+    public CaptureSnapshot getCaptureSnapshot() {
+        return captureSnapshot;
+    }
+
     private boolean hasFollowers(){
         return context.getPeerAddresses().keySet().size() > 0;
     }
@@ -110,11 +116,6 @@ public class SnapshotManager implements SnapshotState {
             return false;
         }
 
-        @Override
-        public void create(Procedure<Void> callback) {
-            LOG.debug("create should not be called in state {}", this);
-        }
-
         @Override
         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
                             RaftActorBehavior currentBehavior, long totalMemory) {
@@ -192,8 +193,6 @@ public class SnapshotManager implements SnapshotState {
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
                     newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
-            SnapshotManager.this.currentState = CAPTURING;
-
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
@@ -205,8 +204,14 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
-            context.getActor().tell(captureSnapshot, context.getActor());
+            try {
+                createSnapshotProcedure.apply(null);
+            } catch (Exception e) {
+                LOG.error("Error creating snapshot", e);
+                return false;
+            }
 
+            SnapshotManager.this.currentState = CREATING;
             return true;
         }
 
@@ -231,30 +236,6 @@ public class SnapshotManager implements SnapshotState {
         }
     }
 
-    private class Capturing extends AbstractSnapshotState {
-
-        @Override
-        public boolean isCapturing() {
-            return true;
-        }
-
-        @Override
-        public void create(Procedure<Void> callback) {
-            try {
-                callback.apply(null);
-                SnapshotManager.this.currentState = CREATING;
-            } catch (Exception e) {
-                LOG.error("Unexpected error occurred", e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "Capturing";
-        }
-
-    }
-
     private class Creating extends AbstractSnapshotState {
 
         @Override
index 9a9bf1c774a49c3a87cb285a599da382ecad9931..b0cbf4b00f6c9f39daadbc23b76b8eab81df89b2 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
@@ -40,13 +39,6 @@ public interface SnapshotState {
      */
     boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
-    /**
-     * Create the snapshot
-     *
-     * @param callback a procedure to be called which should create the snapshot
-     */
-    void create(Procedure<Void> callback);
-
     /**
      * Persist the snapshot
      *
index adf7778fe7daa068d5f6e1bfab0110f6665bc62c..4aa3b2fb4e62349f8c96b06e235b5ea8e7378bcf 100644 (file)
@@ -200,6 +200,7 @@ public class MockRaftActorContext implements RaftActorContext {
     public SnapshotManager getSnapshotManager() {
         if(this.snapshotManager == null){
             this.snapshotManager = new SnapshotManager(this, getLogger());
+            this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
         }
         return this.snapshotManager;
     }
index ae9c784a556496a6edb8d14b82fe9e7e315ea61e..943f65364175f32b365951b251d16480b49d034e 100644 (file)
@@ -13,7 +13,6 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -21,14 +20,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -120,20 +117,6 @@ public class RaftActorSnapshotMessageSupportTest {
         verify(mockCohort).applySnapshot(snapshotBytes);
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testOnCaptureSnapshot() throws Exception {
-
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
-
-        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
-        verify(mockSnapshotManager).create(procedure.capture());
-
-        procedure.getValue().apply(null);
-
-        verify(mockCohort).createSnapshot(same(mockRaftActorRef));
-    }
-
     @Test
     public void testOnCaptureSnapshotReply() {
 
index a8f490e75119678fc84084c50c85dec204941b1d..33e5c4bcdf4e324ebd842c651fdbb99e8ad46f59 100644 (file)
@@ -15,7 +15,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -53,7 +52,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         send2InitialPayloads();
 
         // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
@@ -64,13 +62,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
         // Send another payload.
         MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
@@ -102,60 +93,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
     }
 
     @Test
-    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
-
-        send2InitialPayloads();
-
-        // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
-        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
-
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
-
-        // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
-
-        // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
-
-        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
-
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the AppendEntries to the follower
-        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
-
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
-        // Now deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for snapshot complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-
-        reinstateLeaderActor();
-
-        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
-        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
-        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
-        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
-        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
-        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
-
-        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
-        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
-        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
-        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
-        // This is a side effect of trimming the persisted log to the sequence number captured at the time
-        // the snapshot was initiated.
-        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
-                payload3, payload4), leaderActor.underlyingActor().getState());
-    }
-
-    @Test
-    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+    public void testStatePersistedAfterSnapshotPersisted() {
 
         send2InitialPayloads();
 
index c74705d13f6c682f30d2553665a34552f62bbc47..be748f3d26c3ae64d32ebac624e8100972459783 100644 (file)
@@ -16,7 +16,6 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -300,14 +299,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
 
         // Send the payload.
         payload7 = sendPayloadData(leaderActor, "seven");
 
-        // Capture the CaptureSnapshot message so we can send it later.
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
+        // Capture the CaptureSnapshotReply message so we can send it later.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+                CaptureSnapshotReply.class);
 
         // Wait for the state to be applied in the leader.
         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
@@ -327,12 +326,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
 
-        // Now deliver the CaptureSnapshot.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for CaptureSnapshotReply to complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+        // Now deliver the CaptureSnapshotReply.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -347,8 +343,6 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        expSnapshotState.add(payload7);
-
         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
         // when the snapshot is created (ie when the CaptureSnapshot is processed).
@@ -398,6 +392,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
 
+        expSnapshotState.add(payload7);
+
         testLog.info("testSecondSnapshot ending");
     }
 
index 8ab762f78612a5b540d93512128c4e820b0bbc64..47720d95b6bcfd73458a923e3703f30049349fd7 100644 (file)
@@ -7,6 +7,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -19,7 +20,6 @@ import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,6 +82,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
+        snapshotManager.setCreateSnapshotCallable(mockProcedure);
     }
 
     @After
@@ -95,7 +96,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCaptureToInstall(){
+    public void testCaptureToInstall() throws Exception {
 
         // Force capturing toInstall = true
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
@@ -103,7 +104,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(0L, captureSnapshot.getLastIndex());
@@ -120,7 +123,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCapture(){
+    public void testCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
@@ -128,7 +131,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(9L, captureSnapshot.getLastIndex());
         assertEquals(1L, captureSnapshot.getLastTerm());
@@ -145,6 +151,20 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testCaptureWithCreateProcedureError () throws Exception {
+        doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertFalse(capture);
+
+        assertEquals(false, snapshotManager.isCapturing());
+
+        verify(mockProcedure).apply(null);
+    }
+
     @Test
     public void testIllegalCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -152,9 +172,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
 
-        assertEquals(1, allMatching.size());
+        reset(mockProcedure);
 
         // This will not cause snapshot capture to start again
         capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -162,9 +182,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertFalse(capture);
 
-        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
-
-        assertEquals(1, allMatching.size());
+        verify(mockProcedure, never()).apply(null);
     }
 
     @Test
@@ -188,8 +206,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
@@ -209,60 +225,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
     }
 
-
-    @Test
-    public void testCreate() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure).apply(null);
-
-        assertEquals("isCapturing", true, snapshotManager.isCapturing());
-    }
-
-    @Test
-    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateBeforeCapture() throws Exception {
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(0)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateAfterPersist() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
-
-        reset(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, never()).apply(null);
-    }
-
     @Test
     public void testPersistWhenReplicatedToAllIndexNotMinus(){
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
@@ -276,8 +238,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
@@ -308,8 +268,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -328,8 +286,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
 
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
@@ -368,8 +324,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -391,8 +345,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -447,8 +399,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -469,8 +419,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -504,8 +452,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
index ba0bd0f29c96237ab758487b719eac959d115edc..0255020328655dfe0dee151207f66f9a51eddcc5 100644 (file)
@@ -536,7 +536,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
     }
 
     @Test
@@ -584,7 +584,9 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
         assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
@@ -595,8 +597,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test