Merge "Refactor snapshot code"
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 19:22:37 +0000 (19:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 26 Mar 2015 19:22:37 +0000 (19:22 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
new file mode 100644 (file)
index 0000000..cb38e82
--- /dev/null
@@ -0,0 +1,398 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+    private final SnapshotState IDLE = new Idle();
+    private final SnapshotState CAPTURING = new Capturing();
+    private final SnapshotState PERSISTING = new Persisting();
+    private final SnapshotState CREATING = new Creating();
+
+    private final Logger LOG;
+    private final RaftActorContext context;
+    private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+            new LastAppliedTermInformationReader();
+    private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+            new ReplicatedToAllTermInformationReader();
+
+
+    private SnapshotState currentState = IDLE;
+    private CaptureSnapshot captureSnapshot;
+
+    public SnapshotManager(RaftActorContext context, Logger logger) {
+        this.context = context;
+        this.LOG = logger;
+    }
+
+    @Override
+    public boolean isCapturing() {
+        return currentState.isCapturing();
+    }
+
+    @Override
+    public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+    }
+
+    @Override
+    public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        currentState.capture(lastLogEntry, replicatedToAllIndex);
+    }
+
+    @Override
+    public void create(Procedure<Void> callback) {
+        currentState.create(callback);
+    }
+
+    @Override
+    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
+    }
+
+    @Override
+    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        currentState.commit(persistenceProvider, sequenceNumber);
+    }
+
+    @Override
+    public void rollback() {
+        currentState.rollback();
+    }
+
+    @Override
+    public long trimLog(long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
+    }
+
+    private boolean hasFollowers(){
+        return context.getPeerAddresses().keySet().size() > 0;
+    }
+
+    private String persistenceId(){
+        return context.getId();
+    }
+
+    private class AbstractSnapshotState implements SnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return false;
+        }
+
+        @Override
+        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            LOG.debug("capture should not be called in state {}", this);
+        }
+
+        @Override
+        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            LOG.debug("captureToInstall should not be called in state {}", this);
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            LOG.debug("create should not be called in state {}", this);
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+            LOG.debug("persist should not be called in state {}", this);
+        }
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            LOG.debug("commit should not be called in state {}", this);
+        }
+
+        @Override
+        public void rollback() {
+            LOG.debug("rollback should not be called in state {}", this);
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex) {
+            LOG.debug("trimLog should not be called in state {}", this);
+            return -1;
+        }
+
+        protected long doTrimLog(long desiredTrimIndex){
+            //  we would want to keep the lastApplied as its used while capturing snapshots
+            long lastApplied = context.getLastApplied();
+            long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+                LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+                        context.getTermInformation().getCurrentTerm());
+
+                //use the term of the temp-min, since we check for isPresent, entry will not be null
+                ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+                context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+                context.getReplicatedLog().snapshotCommit();
+                return tempMin;
+            }
+
+            return -1;
+        }
+    }
+
+    private class Idle extends AbstractSnapshotState {
+
+        private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+            TermInformationReader lastAppliedTermInfoReader =
+                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                            lastLogEntry, hasFollowers());
+
+            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+            TermInformationReader replicatedToAllTermInfoReader =
+                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+            // send a CaptureSnapshot to self to make the expensive operation async.
+            captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+                    lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+
+            SnapshotManager.this.currentState = CAPTURING;
+
+            LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
+                    captureSnapshot);
+
+            context.getActor().tell(captureSnapshot, context.getActor());
+        }
+
+        @Override
+        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            capture(lastLogEntry, replicatedToAllIndex, false);
+        }
+
+        @Override
+        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            capture(lastLogEntry, replicatedToAllIndex, true);
+        }
+
+        @Override
+        public String toString() {
+            return "Idle";
+        }
+
+        @Override
+        public long trimLog(long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
+        }
+    }
+
+    private class Capturing extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void create(Procedure<Void> callback) {
+            try {
+                callback.apply(null);
+                SnapshotManager.this.currentState = CREATING;
+            } catch (Exception e) {
+                LOG.error("Unexpected error occurred", e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Capturing";
+        }
+
+    }
+
+    private class Creating extends AbstractSnapshotState {
+
+        @Override
+        public boolean isCapturing() {
+            return true;
+        }
+
+        @Override
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior) {
+            // create a snapshot object from the state provided and save it
+            // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+            Snapshot sn = Snapshot.create(snapshotBytes,
+                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+            persistenceProvider.saveSnapshot(sn);
+
+            LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+            long dataThreshold = Runtime.getRuntime().totalMemory() *
+                    context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+            if (context.getReplicatedLog().dataSize() > dataThreshold) {
+                // if memory is less, clear the log based on lastApplied.
+                // this could/should only happen if one of the followers is down
+                // as normally we keep removing from the log when its replicated to all.
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                        captureSnapshot.getLastAppliedTerm());
+
+                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+            } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+                // clear the log based on replicatedToAllIndex
+                context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                        captureSnapshot.getReplicatedToAllTerm());
+
+                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+            } else {
+                // The replicatedToAllIndex was not found in the log
+                // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+                // In this scenario we may need to save the snapshot to the akka persistence
+                // snapshot for recovery but we do not need to do the replicated log trimming.
+                context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm());
+            }
+
+            LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+                            "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
+
+            if (context.getId().equals(currentBehavior.getLeaderId())
+                    && captureSnapshot.isInstallSnapshotInitiated()) {
+                // this would be call straight to the leader and won't initiate in serialization
+                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+                        ByteString.copyFrom(snapshotBytes)));
+            }
+
+            captureSnapshot = null;
+            SnapshotManager.this.currentState = PERSISTING;
+        }
+
+        @Override
+        public String toString() {
+            return "Creating";
+        }
+
+    }
+
+    private class Persisting extends AbstractSnapshotState {
+
+        @Override
+        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+            context.getReplicatedLog().snapshotCommit();
+            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+            persistenceProvider.deleteMessages(sequenceNumber);
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public void rollback() {
+            context.getReplicatedLog().snapshotRollback();
+
+            LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+                            "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+                    context.getReplicatedLog().getSnapshotIndex(),
+                    context.getReplicatedLog().getSnapshotTerm(),
+                    context.getReplicatedLog().size());
+
+            SnapshotManager.this.currentState = IDLE;
+        }
+
+        @Override
+        public String toString() {
+            return "Persisting";
+        }
+
+    }
+
+    private static interface TermInformationReader {
+        long getIndex();
+        long getTerm();
+    }
+
+    private static class LastAppliedTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+                                         ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+            if (!hasFollowers) {
+                if(lastLogEntry != null) {
+                    index = lastLogEntry.getIndex();
+                    term = lastLogEntry.getTerm();
+                }
+            } else if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            } else if(originalIndex == log.getSnapshotIndex()){
+                index = log.getSnapshotIndex();
+                term = log.getSnapshotTerm();
+            }
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+
+    private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+        private long index;
+        private long term;
+
+        ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+            ReplicatedLogEntry entry = log.get(originalIndex);
+            this.index = -1L;
+            this.term = -1L;
+
+            if (entry != null) {
+                index = entry.getIndex();
+                term = entry.getTerm();
+            }
+
+            return this;
+        }
+
+        @Override
+        public long getIndex(){
+            return this.index;
+        }
+
+        @Override
+        public long getTerm(){
+            return this.term;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
new file mode 100644 (file)
index 0000000..2ff30ec
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+    /**
+     * Should return true when a snapshot is being captured
+     * @return
+     */
+    boolean isCapturing();
+
+    /**
+     * Initiate capture snapshot
+     *
+     * @param lastLogEntry the last entry in the replicated log
+     * @param replicatedToAllIndex the current replicatedToAllIndex
+     */
+    void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+    /**
+     * Initiate capture snapshot for the purposing of installing that snapshot
+     *
+     * @param lastLogEntry
+     * @param replicatedToAllIndex
+     */
+    void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+    /**
+     * Create the snapshot
+     *
+     * @param callback a procedure to be called which should create the snapshot
+     */
+    void create(Procedure<Void> callback);
+
+    /**
+     * Persist the snapshot
+     *
+     * @param persistenceProvider
+     * @param snapshotBytes
+     * @param currentBehavior
+     */
+    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior);
+
+    /**
+     * Commit the snapshot by trimming the log
+     *
+     * @param persistenceProvider
+     * @param sequenceNumber
+     */
+    void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+    /**
+     * Rollback the snapshot
+     */
+    void rollback();
+
+    /**
+     * Trim the log
+     *
+     * @param desiredTrimIndex
+     * @return the actual trim index
+     */
+    long trimLog(long desiredTrimIndex);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
new file mode 100644 (file)
index 0000000..90272fe
--- /dev/null
@@ -0,0 +1,528 @@
+package org.opendaylight.controller.cluster.raft;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.testkit.TestActorRef;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotManagerTest extends AbstractActorTest {
+
+    @Mock
+    private RaftActorContext mockRaftActorContext;
+
+    @Mock
+    private ConfigParams mockConfigParams;
+
+    @Mock
+    private ReplicatedLog mockReplicatedLog;
+
+    @Mock
+    private DataPersistenceProvider mockDataPersistenceProvider;
+
+    @Mock
+    private RaftActorBehavior mockRaftActorBehavior;
+
+    @Mock
+    private Procedure<Void> mockProcedure;
+
+    private SnapshotManager snapshotManager;
+
+    private TestActorFactory factory;
+
+    private TestActorRef<MessageCollectorActor> actorRef;
+
+    @Before
+    public void setUp(){
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+        doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
+        doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+        doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+
+        snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
+        factory = new TestActorFactory(getSystem());
+
+        actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+        doReturn(actorRef).when(mockRaftActorContext).getActor();
+
+    }
+
+    @After
+    public void tearDown(){
+        factory.close();
+    }
+
+    @Test
+    public void testConstruction(){
+        assertEquals(false, snapshotManager.isCapturing());
+    }
+
+    @Test
+    public void testCaptureToInstall(){
+
+        // Force capturing toInstall = true
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new MockRaftActorContext.MockPayload()), 0);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(0L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+        actorRef.underlyingActor().clear();
+    }
+
+    @Test
+    public void testCapture(){
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+        // LastIndex and LastTerm are picked up from the lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastIndex());
+        assertEquals(1L, captureSnapshot.getLastTerm());
+
+        // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+        assertEquals(9L, captureSnapshot.getLastAppliedIndex());
+        assertEquals(1L, captureSnapshot.getLastAppliedTerm());
+
+        //
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+        assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+
+        actorRef.underlyingActor().clear();
+
+    }
+
+    @Test
+    public void testIllegalCapture() throws Exception {
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+
+        // This will not cause snapshot capture to start again
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+
+        assertEquals(1, allMatching.size());
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexMinusOne(){
+        doReturn("123").when(mockRaftActorContext).getId();
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+
+        ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+        verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+
+        Snapshot snapshot = snapshotArgumentCaptor.getValue();
+
+        assertEquals(6, snapshot.getLastAppliedTerm());
+        assertEquals(9, snapshot.getLastAppliedIndex());
+        assertEquals(9, snapshot.getLastIndex());
+        assertEquals(6, snapshot.getLastTerm());
+        assertEquals(10, snapshot.getState().length);
+        assertTrue(Arrays.equals(bytes, snapshot.getState()));
+        assertEquals(0, snapshot.getUnAppliedEntries().size());
+
+        verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
+    }
+
+
+    @Test
+    public void testCreate() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateBeforeCapture() throws Exception {
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(0)).apply(null);
+    }
+
+    @Test
+    public void testCallingCreateAfterPersist() throws Exception {
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, times(1)).apply(null);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        Mockito.reset(mockProcedure);
+
+        snapshotManager.create(mockProcedure);
+
+        verify(mockProcedure, never()).apply(null);
+    }
+
+    @Test
+    public void testPersistWhenReplicatedToAllIndexNotMinus(){
+        doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
+        doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
+        doReturn(6L).when(replicatedLogEntry).getTerm();
+        doReturn(9L).when(replicatedLogEntry).getIndex();
+
+        // when replicatedToAllIndex != -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
+    }
+
+
+    @Test
+    public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1);
+    }
+
+    @Test
+    public void testPersistSendInstallSnapshot(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
+                = ArgumentCaptor.forClass(SendInstallSnapshot.class);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
+
+        SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
+
+        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+    }
+
+    @Test
+    public void testCallingPersistWithoutCaptureWillDoNothing(){
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+    @Test
+    public void testCallingPersistTwiceWillDoNoHarm(){
+        doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+        verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
+    }
+
+    @Test
+    public void testCommit(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog).snapshotCommit();
+
+        verify(mockDataPersistenceProvider).deleteMessages(100L);
+
+        ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+
+        verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
+
+        assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
+                                                                     // config snapShotBatchCount = 10
+                                                                     // therefore maxSequenceNumber = 90
+    }
+
+    @Test
+    public void testCommitBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCommitBeforeCapture(){
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
+
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+    }
+
+    @Test
+    public void testCallingCommitMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        snapshotManager.commit(mockDataPersistenceProvider, 100L);
+
+        verify(mockReplicatedLog, times(1)).snapshotCommit();
+
+        verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+
+        verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+    }
+
+    @Test
+    public void testRollback(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog).snapshotRollback();
+    }
+
+
+    @Test
+    public void testRollbackBeforePersist(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testRollbackBeforeCapture(){
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, never()).snapshotRollback();
+    }
+
+    @Test
+    public void testCallingRollbackMultipleTimesCausesNoHarm(){
+        // when replicatedToAllIndex = -1
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1);
+
+        snapshotManager.create(mockProcedure);
+
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+
+        snapshotManager.rollback();
+
+        snapshotManager.rollback();
+
+        verify(mockReplicatedLog, times(1)).snapshotRollback();
+    }
+
+    @Test
+    public void testTrimLog(){
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog).snapshotCommit();
+    }
+
+    @Test
+    public void testTrimLogAfterCapture(){
+        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+    @Test
+    public void testTrimLogAfterCaptureToInstall(){
+        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9);
+
+        assertEquals(true, snapshotManager.isCapturing());
+
+        ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
+        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(20L).when(mockRaftActorContext).getLastApplied();
+        doReturn(true).when(mockReplicatedLog).isPresent(10);
+        doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+        doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(5L).when(replicatedLogEntry).getTerm();
+
+        snapshotManager.trimLog(10);
+
+        verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
+        verify(mockReplicatedLog, never()).snapshotCommit();
+
+    }
+
+}
\ No newline at end of file