Merge "Bug 2948: Perform create snapshot synchronously"
authorMoiz Raja <moraja@cisco.com>
Tue, 21 Apr 2015 07:15:28 +0000 (07:15 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 21 Apr 2015 07:15:29 +0000 (07:15 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 157a53ed2d1797b662f6fadccd3acf4027ae442f..1101af8442e656d749acd379bdda0b298058685c 100644 (file)
@@ -136,6 +136,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
 
         super.preStart();
+
+        snapshotSupport = newRaftActorSnapshotMessageSupport();
     }
 
     @Override
@@ -193,10 +195,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleCommand(Object message) {
-        if(snapshotSupport == null) {
-            snapshotSupport = newRaftActorSnapshotMessageSupport();
-        }
-
         boolean handled = snapshotSupport.handleSnapshotMessage(message);
         if(handled) {
             return;
index 790ff89510ab222473a68b8105eab31c32158dd2..6860b791d37bf6b54b3a42781f2c1062fc64a8b6 100644 (file)
@@ -12,7 +12,6 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -45,6 +44,8 @@ class RaftActorSnapshotMessageSupport {
         this.currentBehavior = currentBehavior;
         this.cohort = cohort;
         this.log = context.getLogger();
+
+        context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
     }
 
     boolean handleSnapshotMessage(Object message) {
@@ -57,9 +58,6 @@ class RaftActorSnapshotMessageSupport {
         } else if (message instanceof SaveSnapshotFailure) {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
             return true;
-        } else if (message instanceof CaptureSnapshot) {
-            onCaptureSnapshot(message);
-            return true;
         } else if (message instanceof CaptureSnapshotReply) {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
             return true;
@@ -77,12 +75,6 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
-    private void onCaptureSnapshot(Object message) {
-        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
-
-        context.getSnapshotManager().create(createSnapshotProcedure);
-    }
-
     private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
         log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                 context.getId(), saveSnapshotFailure.cause());
index f4f936bf161b2260f9cae4f397ded3af706f2294..5b0ebcddee9fac1c0a9a7bdf8df31917c8eb2f3b 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -21,7 +22,6 @@ import org.slf4j.Logger;
 public class SnapshotManager implements SnapshotState {
 
     private final SnapshotState IDLE = new Idle();
-    private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
@@ -37,6 +37,8 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
+    private Procedure<Void> createSnapshotProcedure;
+
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
@@ -57,11 +59,6 @@ public class SnapshotManager implements SnapshotState {
         return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
-    @Override
-    public void create(Procedure<Void> callback) {
-        currentState.create(callback);
-    }
-
     @Override
     public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
                         RaftActorBehavior currentBehavior, long totalMemory) {
@@ -83,6 +80,15 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
+    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
+    @VisibleForTesting
+    public CaptureSnapshot getCaptureSnapshot() {
+        return captureSnapshot;
+    }
+
     private boolean hasFollowers(){
         return context.getPeerAddresses().keySet().size() > 0;
     }
@@ -110,11 +116,6 @@ public class SnapshotManager implements SnapshotState {
             return false;
         }
 
-        @Override
-        public void create(Procedure<Void> callback) {
-            LOG.debug("create should not be called in state {}", this);
-        }
-
         @Override
         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
                             RaftActorBehavior currentBehavior, long totalMemory) {
@@ -192,8 +193,6 @@ public class SnapshotManager implements SnapshotState {
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
                     newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
-            SnapshotManager.this.currentState = CAPTURING;
-
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
@@ -205,8 +204,14 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
-            context.getActor().tell(captureSnapshot, context.getActor());
+            try {
+                createSnapshotProcedure.apply(null);
+            } catch (Exception e) {
+                LOG.error("Error creating snapshot", e);
+                return false;
+            }
 
+            SnapshotManager.this.currentState = CREATING;
             return true;
         }
 
@@ -231,30 +236,6 @@ public class SnapshotManager implements SnapshotState {
         }
     }
 
-    private class Capturing extends AbstractSnapshotState {
-
-        @Override
-        public boolean isCapturing() {
-            return true;
-        }
-
-        @Override
-        public void create(Procedure<Void> callback) {
-            try {
-                callback.apply(null);
-                SnapshotManager.this.currentState = CREATING;
-            } catch (Exception e) {
-                LOG.error("Unexpected error occurred", e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "Capturing";
-        }
-
-    }
-
     private class Creating extends AbstractSnapshotState {
 
         @Override
index 9a9bf1c774a49c3a87cb285a599da382ecad9931..b0cbf4b00f6c9f39daadbc23b76b8eab81df89b2 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
@@ -40,13 +39,6 @@ public interface SnapshotState {
      */
     boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
-    /**
-     * Create the snapshot
-     *
-     * @param callback a procedure to be called which should create the snapshot
-     */
-    void create(Procedure<Void> callback);
-
     /**
      * Persist the snapshot
      *
index adf7778fe7daa068d5f6e1bfab0110f6665bc62c..4aa3b2fb4e62349f8c96b06e235b5ea8e7378bcf 100644 (file)
@@ -200,6 +200,7 @@ public class MockRaftActorContext implements RaftActorContext {
     public SnapshotManager getSnapshotManager() {
         if(this.snapshotManager == null){
             this.snapshotManager = new SnapshotManager(this, getLogger());
+            this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
         }
         return this.snapshotManager;
     }
index ae9c784a556496a6edb8d14b82fe9e7e315ea61e..943f65364175f32b365951b251d16480b49d034e 100644 (file)
@@ -13,7 +13,6 @@ import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
-import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
@@ -21,14 +20,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
@@ -120,20 +117,6 @@ public class RaftActorSnapshotMessageSupportTest {
         verify(mockCohort).applySnapshot(snapshotBytes);
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Test
-    public void testOnCaptureSnapshot() throws Exception {
-
-        sendMessageToSupport(new CaptureSnapshot(3, 1, 2, 1, 2, 1, null));
-
-        ArgumentCaptor<Procedure> procedure = ArgumentCaptor.forClass(Procedure.class);
-        verify(mockSnapshotManager).create(procedure.capture());
-
-        procedure.getValue().apply(null);
-
-        verify(mockCohort).createSnapshot(same(mockRaftActorRef));
-    }
-
     @Test
     public void testOnCaptureSnapshotReply() {
 
index a8f490e75119678fc84084c50c85dec204941b1d..33e5c4bcdf4e324ebd842c651fdbb99e8ad46f59 100644 (file)
@@ -15,7 +15,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -53,7 +52,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         send2InitialPayloads();
 
         // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
@@ -64,13 +62,6 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
         // Send another payload.
         MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
@@ -102,60 +93,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
     }
 
     @Test
-    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
-
-        send2InitialPayloads();
-
-        // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
-        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
-
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
-
-        // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
-
-        // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
-
-        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
-
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the AppendEntries to the follower
-        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
-
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
-        // Now deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for snapshot complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-
-        reinstateLeaderActor();
-
-        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
-        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
-        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
-        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
-        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
-        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
-
-        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
-        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
-        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
-        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
-        // This is a side effect of trimming the persisted log to the sequence number captured at the time
-        // the snapshot was initiated.
-        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
-                payload3, payload4), leaderActor.underlyingActor().getState());
-    }
-
-    @Test
-    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+    public void testStatePersistedAfterSnapshotPersisted() {
 
         send2InitialPayloads();
 
index c74705d13f6c682f30d2553665a34552f62bbc47..be748f3d26c3ae64d32ebac624e8100972459783 100644 (file)
@@ -16,7 +16,6 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -300,14 +299,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
 
         // Send the payload.
         payload7 = sendPayloadData(leaderActor, "seven");
 
-        // Capture the CaptureSnapshot message so we can send it later.
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
+        // Capture the CaptureSnapshotReply message so we can send it later.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+                CaptureSnapshotReply.class);
 
         // Wait for the state to be applied in the leader.
         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
@@ -327,12 +326,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
 
-        // Now deliver the CaptureSnapshot.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for CaptureSnapshotReply to complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+        // Now deliver the CaptureSnapshotReply.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -347,8 +343,6 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        expSnapshotState.add(payload7);
-
         // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
         // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
         // when the snapshot is created (ie when the CaptureSnapshot is processed).
@@ -398,6 +392,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
 
+        expSnapshotState.add(payload7);
+
         testLog.info("testSecondSnapshot ending");
     }
 
index 8ab762f78612a5b540d93512128c4e820b0bbc64..47720d95b6bcfd73458a923e3703f30049349fd7 100644 (file)
@@ -7,6 +7,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -19,7 +20,6 @@ import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,6 +82,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
+        snapshotManager.setCreateSnapshotCallable(mockProcedure);
     }
 
     @After
@@ -95,7 +96,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCaptureToInstall(){
+    public void testCaptureToInstall() throws Exception {
 
         // Force capturing toInstall = true
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
@@ -103,7 +104,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(0L, captureSnapshot.getLastIndex());
@@ -120,7 +123,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCapture(){
+    public void testCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
@@ -128,7 +131,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
+
+        CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
         // LastIndex and LastTerm are picked up from the lastLogEntry
         assertEquals(9L, captureSnapshot.getLastIndex());
         assertEquals(1L, captureSnapshot.getLastTerm());
@@ -145,6 +151,20 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     }
 
+    @Test
+    public void testCaptureWithCreateProcedureError () throws Exception {
+        doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertFalse(capture);
+
+        assertEquals(false, snapshotManager.isCapturing());
+
+        verify(mockProcedure).apply(null);
+    }
+
     @Test
     public void testIllegalCapture() throws Exception {
         boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -152,9 +172,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+        verify(mockProcedure).apply(null);
 
-        assertEquals(1, allMatching.size());
+        reset(mockProcedure);
 
         // This will not cause snapshot capture to start again
         capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
@@ -162,9 +182,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertFalse(capture);
 
-        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
-
-        assertEquals(1, allMatching.size());
+        verify(mockProcedure, never()).apply(null);
     }
 
     @Test
@@ -188,8 +206,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
@@ -209,60 +225,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
     }
 
-
-    @Test
-    public void testCreate() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure).apply(null);
-
-        assertEquals("isCapturing", true, snapshotManager.isCapturing());
-    }
-
-    @Test
-    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateBeforeCapture() throws Exception {
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(0)).apply(null);
-    }
-
-    @Test
-    public void testCallingCreateAfterPersist() throws Exception {
-        // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, times(1)).apply(null);
-
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
-                , Runtime.getRuntime().totalMemory());
-
-        reset(mockProcedure);
-
-        snapshotManager.create(mockProcedure);
-
-        verify(mockProcedure, never()).apply(null);
-    }
-
     @Test
     public void testPersistWhenReplicatedToAllIndexNotMinus(){
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
@@ -276,8 +238,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
@@ -308,8 +268,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
                 new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -328,8 +286,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         assertTrue(capture);
 
-        snapshotManager.create(mockProcedure);
-
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
 
         snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
@@ -368,8 +324,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -391,8 +345,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -447,8 +399,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -469,8 +419,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
@@ -504,8 +452,6 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
-        snapshotManager.create(mockProcedure);
-
         snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
                 , Runtime.getRuntime().totalMemory());
 
index ba0bd0f29c96237ab758487b719eac959d115edc..0255020328655dfe0dee151207f66f9a51eddcc5 100644 (file)
@@ -536,7 +536,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
     }
 
     @Test
@@ -584,7 +584,9 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
 
         assertTrue(cs.isInstallSnapshotInitiated());
         assertEquals(3, cs.getLastAppliedIndex());
@@ -595,8 +597,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test