Refactor snapshot code 10/15610/10
authorMoiz Raja <moraja@cisco.com>
Sun, 22 Feb 2015 20:45:25 +0000 (12:45 -0800)
committerMoiz Raja <moraja@cisco.com>
Wed, 25 Mar 2015 18:13:28 +0000 (11:13 -0700)
Snapshot manipulation code and logic is now spread across multiple classes
and is becoming difficult to maintain and understand. This is an attempt to
simplify the implementation and make it easy to move forward.

Change-Id: I34e61c9d4bb4663d5f87d6c11fb6ed75c1bb33f2
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
new file mode 100644 (file)
index 0000000..cb38e82
--- /dev/null
@@ -0,0 +1,398 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+    private final SnapshotState IDLE = new Idle();
+    private final SnapshotState CAPTURING = new Capturing();
+    private final SnapshotState PERSISTING = new Persisting();
+    private final SnapshotState CREATING = new Creating();
+
+    private final Logger LOG;
+    private final RaftActorContext context;
+    private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+            new LastAppliedTermInformationReader();
+    private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+            new ReplicatedToAllTermInformationReader();
+
+
+    private SnapshotState currentState = IDLE;
+    private CaptureSnapshot captureSnapshot;
+
+    public SnapshotManager(RaftActorContext context, Logger logger) {
+        this.context = context;
+        this.LOG = logger;
+    }
+
+    @Override
+    public boolean isCapturing() {
+        return currentState.isCapturing();
+    }
+
+    @Override
+    public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+    }
+
+    @Override
+    public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        currentState.capture(lastLogEntry, replicatedToAllIndex);
+    }
+
+    @Override
+    public void create(Procedure<Void> callback) {
+        currentState.create(callback);
+    }
+
+    @Override
+    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
+    }
+
+    @Override
+    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        currentState.commit(persistenceProvider, sequenceNumber);
+    }
+
+    @Override
+    public void rollback() {
+        currentState.rollback();
+    }
+
+    @Override
+    public long trimLog(long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
+    }
+
+    private boolean hasFollowers(){
+        return context.getPeerAddresses().keySet().size() > 0;
+    }
+
+    private String persistenceId(){
+        return context.getId();
+    }
+
+    private class AbstractSnapshotState implements SnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return false;
+        }
+
+        @Override
+        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            LOG.debug("capture should not be called in state {}", this);
+        }
+
+        @Override
+        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            LOG.debug("captureToInstall should not be called in state {}", this);
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            LOG.debug("create should not be called in state {}", this);
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+            LOG.debug("persist should not be called in state {}", this);
+        }
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            LOG.debug("commit should not be called in state {}", this);
+        }
+
+        @Override
+        public void rollback() {
+            LOG.debug("rollback should not be called in state {}", this);
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex) {
+            LOG.debug("trimLog should not be called in state {}", this);
+            return -1;
+        }
+
+        protected long doTrimLog(long desiredTrimIndex){
+            //  we would want to keep the lastApplied as its used while capturing snapshots
+            long lastApplied = context.getLastApplied();
+            long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+                LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+                        context.getTermInformation().getCurrentTerm());
+
+                //use the term of the temp-min, since we check for isPresent, entry will not be null
+                ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+                context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+                context.getReplicatedLog().snapshotCommit();
+                return tempMin;
+            }
+
+            return -1;
+        }
+    }
+
+    private class Idle extends AbstractSnapshotState {
+
+        private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+            TermInformationReader lastAppliedTermInfoReader =
+                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                            lastLogEntry, hasFollowers());
+
+            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+            TermInformationReader replicatedToAllTermInfoReader =
+                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+            // send a CaptureSnapshot to self to make the expensive operation async.
+            captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+                    lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+
+            SnapshotManager.this.currentState = CAPTURING;
+
+            LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
+                    captureSnapshot);
+
+            context.getActor().tell(captureSnapshot, context.getActor());
+        }
+
+        @Override
+        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            capture(lastLogEntry, replicatedToAllIndex, false);
+        }
+
+        @Override
+        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            capture(lastLogEntry, replicatedToAllIndex, true);
+        }
+
+        @Override
+        public String toString() {
+            return "Idle";
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
+        }
+    }
+
+    private class Capturing extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            try {
+                callback.apply(null);
+                SnapshotManager.this.currentState = CREATING;
+            } catch (Exception e) {
+                LOG.error("Unexpected error occurred", e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Capturing";
+        }
+
+    }
+
+    private class Creating extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior) {
+            // create a snapshot object from the state provided and save it
+            // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+            Snapshot sn = Snapshot.create(snapshotBytes,
+                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+            persistenceProvider.saveSnapshot(sn);
+
+            LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+            long dataThreshold = Runtime.getRuntime().totalMemory() *
+                    context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+            if (context.getReplicatedLog().dataSize() > dataThreshold) {
+                // if memory is less, clear the log based on lastApplied.
+                // this could/should only happen if one of the followers is down
+                // as normally we keep removing from the log when its replicated to all.
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                        captureSnapshot.getLastAppliedTerm());
+
+                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+            } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+                // clear the log based on replicatedToAllIndex
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                        captureSnapshot.getReplicatedToAllTerm());
+
+                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+            } else {
+                // The replicatedToAllIndex was not found in the log
+                // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+                // In this scenario we may need to save the snapshot to the akka persistence
+                // snapshot for recovery but we do not need to do the replicated log trimming.
+                context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
+            }
+
+            LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+                            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
+
+            if (context.getId().equals(currentBehavior.getLeaderId())
+                    && captureSnapshot.isInstallSnapshotInitiated()) {
+                // this would be call straight to the leader and won't initiate in serialization
+                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+                        ByteString.copyFrom(snapshotBytes)));
+            }
+
+            captureSnapshot = null;
+            SnapshotManager.this.currentState = PERSISTING;
+        }
+
+        @Override
+        public String toString() {
+            return "Creating";
+        }
+
+    }
+
+    private class Persisting extends AbstractSnapshotState {
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            context.getReplicatedLog().snapshotCommit();
+            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+            persistenceProvider.deleteMessages(sequenceNumber);
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public void rollback() {
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+                            "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    context.getReplicatedLog().size());
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public String toString() {
+            return "Persisting";
+        }
+
+    }
+
+    private static interface TermInformationReader {
+        long getIndex();
+        long getTerm();
+    }
+
+    private static class LastAppliedTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+                                         ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+            if (!hasFollowers) {
+                if(lastLogEntry != null) {
+                    index = lastLogEntry.getIndex();
+                    term = lastLogEntry.getTerm();
+                }
+            } else if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            } else if(originalIndex == log.getSnapshotIndex()){
+                index = log.getSnapshotIndex();
+                term = log.getSnapshotTerm();
+            }
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+
+    private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+
+            if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            }
+
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
new file mode 100644 (file)
index 0000000..2ff30ec
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+    /**
+     * Should return true when a snapshot is being captured
+     * @return
+     */
+    boolean isCapturing();
+
+    /**
+     * Initiate capture snapshot
+     *
+     * @param lastLogEntry the last entry in the replicated log
+     * @param replicatedToAllIndex the current replicatedToAllIndex
+     */
+    void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+    /**
+     * Initiate capture snapshot for the purposing of installing that snapshot
+     *
+     * @param lastLogEntry
+     * @param replicatedToAllIndex
+     */
+    void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+    /**
+     * Create the snapshot
+     *
+     * @param callback a procedure to be called which should create the snapshot
+     */
+    void create(Procedure<Void> callback);
+
+    /**
+     * Persist the snapshot
+     *
+     * @param persistenceProvider
+     * @param snapshotBytes
+     * @param currentBehavior
+     */
+    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior);
+
+    /**
+     * Commit the snapshot by trimming the log
+     *
+     * @param persistenceProvider
+     * @param sequenceNumber
+     */
+    void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+    /**
+     * Rollback the snapshot
+     */
+    void rollback();
+
+    /**
+     * Trim the log
+     *
+     * @param desiredTrimIndex
+     * @return the actual trim index
+     */
+    long trimLog(long desiredTrimIndex);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
new file mode 100644 (file)
index 0000000..90272fe
--- /dev/null
@@ -0,0 +1,528 @@
+package org.opendaylight.controller.cluster.raft;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+    @Mock
+    private RaftActorContext mockRaftActorContext;
+
+    @Mock
+    private ConfigParams mockConfigParams;
+
+    @Mock
+    private ReplicatedLog mockReplicatedLog;
+
+    @Mock
+    private DataPersistenceProvider mockDataPersistenceProvider;
+
+    @Mock
+    private RaftActorBehavior mockRaftActorBehavior;
+
+    @Mock
+    private Procedure<Void> mockProcedure;
+
+    private SnapshotManager snapshotManager;
+
+    private TestActorFactory factory;
+
+    private TestActorRef<MessageCollectorActor> actorRef;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+        doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+        doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+        doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+        snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+        factory = new TestActorFactory(getSystem());
+
+        actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+        doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+    }
+
+    @After
+    public void tearDown(){
+        factory.close();
+    }
+
+    @Test
+    public void testConstruction(){
+        assertEquals(false, snapshotManager.isCapturing());
+    }
+
+    @Test
+    public void testCaptureToInstall(){
+
+        // Force capturing toInstall = true
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new MockRaftActorContext.MockPayload()), 0);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+        actorRef.underlyingActor().clear();
+    }
+
+    @Test
+    public void testCapture(){
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+        actorRef.underlyingActor().clear();
+
+    }
+
+    @Test
+    public void testIllegalCapture() throws Exception {
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+
+        // This will not cause snapshot capture to start again
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexMinusOne(){
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+
+        ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+        verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+        Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+        assertEquals(6, snapshot.getLastAppliedTerm());
+        assertEquals(9, snapshot.getLastAppliedIndex());
+        assertEquals(9, snapshot.getLastIndex());
+        assertEquals(6, snapshot.getLastTerm());
+        assertEquals(10, snapshot.getState().length);
+        assertTrue(Arrays.equals(bytes, snapshot.getState()));
+        assertEquals(0, snapshot.getUnAppliedEntries().size());
+
+        verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
+    }
+
+
+    @Test
+    public void testCreate() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateBeforeCapture() throws Exception {
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(0)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateAfterPersist() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        Mockito.reset(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, never()).apply(null);
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexNotMinus(){
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+        doReturn(6L).when(replicatedLogEntry).getTerm();
+        doReturn(9L).when(replicatedLogEntry).getIndex();
+
+        // when replicatedToAllIndex != -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+    }
+
+
+    @Test
+    public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1);
+    }
+
+    @Test
+    public void testPersistSendInstallSnapshot(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+                = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+        SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+    }
+
+    @Test
+    public void testCallingPersistWithoutCaptureWillDoNothing(){
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+    @Test
+    public void testCallingPersistTwiceWillDoNoHarm(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+
+    @Test
+    public void testCommit(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog).snapshotCommit();
+
+        verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+        ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+        verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+        assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+                                                                     // config snapShotBatchCount = 10
+                                                                     // therefore maxSequenceNumber = 90
+    }
+
+    @Test
+    public void testCommitBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCommitBeforeCapture(){
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCallingCommitMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+    }
+
+    @Test
+    public void testRollback(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog).snapshotRollback();
+    }
+
+
+    @Test
+    public void testRollbackBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testRollbackBeforeCapture(){
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testCallingRollbackMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.rollback();
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, times(1)).snapshotRollback();
+    }
+
+    @Test
+    public void testTrimLog(){
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog).snapshotCommit();
+    }
+
+    @Test
+    public void testTrimLogAfterCapture(){
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+    @Test
+    public void testTrimLogAfterCaptureToInstall(){
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+}
\ No newline at end of file