Migrate SnapshotManagerTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / SnapshotManagerTest.java
index 27596dff74f6a0b83ebab2775cf507d7515900fb..3133c5bc45f96e71b1291d3bb5a6d496f5c8432d 100644 (file)
@@ -12,8 +12,9 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -21,26 +22,34 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
-import akka.testkit.TestActorRef;
+import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class SnapshotManagerTest extends AbstractActorTest {
 
     @Mock
@@ -59,7 +68,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     private RaftActorBehavior mockRaftActorBehavior;
 
     @Mock
-    private Procedure<Void> mockProcedure;
+    private Consumer<Optional<OutputStream>> mockProcedure;
 
     @Mock
     private ElectionTerm mockElectionTerm;
@@ -68,12 +77,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     private TestActorFactory factory;
 
-    private TestActorRef<MessageCollectorActor> actorRef;
+    private ActorRef actorRef;
 
     @Before
-    public void setUp(){
-        MockitoAnnotations.initMocks(this);
-
+    public void setUp() {
         doReturn(false).when(mockRaftActorContext).hasFollowers();
         doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
@@ -81,41 +88,48 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
         doReturn("123").when(mockRaftActorContext).getId();
         doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
+        doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
 
         doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
         doReturn("member5").when(mockElectionTerm).getVotedFor();
 
+        doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+                .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
+
         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
         factory = new TestActorFactory(getSystem());
 
-        actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+        actorRef = factory.createActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
         doReturn(actorRef).when(mockRaftActorContext).getActor();
 
-        snapshotManager.setCreateSnapshotCallable(mockProcedure);
+        snapshotManager.setCreateSnapshotConsumer(mockProcedure);
     }
 
     @After
-    public void tearDown(){
+    public void tearDown() {
         factory.close();
     }
 
     @Test
-    public void testConstruction(){
+    public void testConstruction() {
         assertEquals(false, snapshotManager.isCapturing());
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
-    public void testCaptureToInstall() throws Exception {
+    public void testCaptureToInstall() {
 
         // Force capturing toInstall = true
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+        snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1,
                 new MockRaftActorContext.MockPayload()), 0, "follower-1");
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).apply(null);
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", true, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -130,19 +144,22 @@ public class SnapshotManagerTest extends AbstractActorTest {
         //
         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
-        actorRef.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(actorRef);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testCapture() throws Exception {
-        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+    public void testCapture() {
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).apply(null);
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
@@ -158,75 +175,75 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
 
-        actorRef.underlyingActor().clear();
-
+        MessageCollectorActor.clearMessages(actorRef);
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Test
-    public void testCaptureWithNullLastLogEntry() throws Exception {
+    public void testCaptureWithNullLastLogEntry() {
         boolean capture = snapshotManager.capture(null, 1);
 
         assertTrue(capture);
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        verify(mockProcedure).apply(null);
+        ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(outputStream.capture());
+        assertEquals("isPresent", false, outputStream.getValue().isPresent());
 
         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
 
-        System.out.println(captureSnapshot);
-
         // LastIndex and LastTerm are picked up from the lastLogEntry
-        assertEquals(-1L, captureSnapshot.getLastIndex());
-        assertEquals(-1L, captureSnapshot.getLastTerm());
+        assertEquals(0, captureSnapshot.getLastIndex());
+        assertEquals(0, captureSnapshot.getLastTerm());
 
         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
-        assertEquals(-1L, captureSnapshot.getLastAppliedIndex());
-        assertEquals(-1L, captureSnapshot.getLastAppliedTerm());
+        assertEquals(0, captureSnapshot.getLastAppliedIndex());
+        assertEquals(0, captureSnapshot.getLastAppliedTerm());
 
         //
         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
-        actorRef.underlyingActor().clear();
-
+        MessageCollectorActor.clearMessages(actorRef);
     }
 
     @Test
-    public void testCaptureWithCreateProcedureError () throws Exception {
-        doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+    public void testCaptureWithCreateProcedureError() {
+        doThrow(new RuntimeException("mock")).when(mockProcedure).accept(any());
 
-        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
 
         assertFalse(capture);
 
         assertEquals(false, snapshotManager.isCapturing());
 
-        verify(mockProcedure).apply(null);
+        verify(mockProcedure).accept(any());
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testIllegalCapture() throws Exception {
-        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+    public void testIllegalCapture() {
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
-        verify(mockProcedure).apply(null);
+        verify(mockProcedure).accept(any());
 
         reset(mockProcedure);
 
         // This will not cause snapshot capture to start again
-        capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
 
         assertFalse(capture);
 
-        verify(mockProcedure, never()).apply(null);
+        verify(mockProcedure, never()).accept(any());
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexMinusOne(){
+    public void testPersistWhenReplicatedToAllIndexMinusOne() {
         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
 
@@ -234,11 +251,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         doReturn(8L).when(mockRaftActorContext).getLastApplied();
 
-        MockRaftActorContext.MockReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                3L, 9L, new MockRaftActorContext.MockPayload());
+        ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 3L, new MockRaftActorContext.MockPayload());
 
-        MockRaftActorContext.MockReplicatedLogEntry lastAppliedEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                2L, 8L, new MockRaftActorContext.MockPayload());
+        ReplicatedLogEntry lastAppliedEntry = new SimpleReplicatedLogEntry(
+                8L, 2L, new MockRaftActorContext.MockPayload());
 
         doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L);
         doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L);
@@ -246,8 +262,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         // when replicatedToAllIndex = -1
         snapshotManager.capture(lastLogEntry, -1);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -258,7 +274,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
         assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
         assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
@@ -267,20 +283,20 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testPersistWhenReplicatedToAllIndexNotMinus(){
+    public void testPersistWhenReplicatedToAllIndexNotMinus() {
         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
         doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(null).when(mockReplicatedLog).get(0);
         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
         doReturn(6L).when(replicatedLogEntry).getTerm();
         doReturn(9L).when(replicatedLogEntry).getIndex();
 
         // when replicatedToAllIndex != -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), 9);
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+        snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -291,7 +307,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
         assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
         assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
-        assertArrayEquals("getState", bytes, snapshot.getState());
+        assertEquals("getState", snapshotState, snapshot.getState());
         assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
@@ -300,14 +316,13 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+    public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
-                new MockRaftActorContext.MockPayload()), -1);
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -326,14 +341,15 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         long replicatedToAllIndex = 1;
         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+        doReturn(null).when(mockReplicatedLog).get(0);
         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex);
         doReturn(6L).when(replicatedLogEntry).getTerm();
         doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex();
 
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, 2000000L);
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -342,19 +358,29 @@ public class SnapshotManagerTest extends AbstractActorTest {
         verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Test
-    public void testPersistSendInstallSnapshot(){
+    public void testPersistSendInstallSnapshot() throws Exception {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+        doNothing().when(mockProcedure).accept(any());
 
         // when replicatedToAllIndex = -1
-        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+        boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         assertTrue(capture);
 
-        byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+        ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+
+        ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+        verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
+
+        Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+        assertEquals("isPresent", true, installSnapshotStream.isPresent());
 
-        snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        installSnapshotStream.get().write(snapshotState.getBytes());
+
+        snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -369,12 +395,13 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
 
-        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
+        assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+        assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
     }
 
     @Test
-    public void testCallingPersistWithoutCaptureWillDoNothing(){
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+    public void testCallingPersistWithoutCaptureWillDoNothing() {
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -382,38 +409,35 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
     }
+
     @Test
-    public void testCallingPersistTwiceWillDoNoHarm(){
+    public void testCallingPersistTwiceWillDoNoHarm() {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
-        verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
     }
 
     @Test
-    public void testCommit(){
+    public void testCommit() {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L, 1234L);
 
         assertEquals(false, snapshotManager.isCapturing());
 
@@ -421,24 +445,23 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockDataPersistenceProvider).deleteMessages(50L);
 
-        ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+        ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor =
+                ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
 
         verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
 
-        assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
-                                                                     // config snapShotBatchCount = 10
-                                                                     // therefore maxSequenceNumber = 90
+        assertEquals(Long.MAX_VALUE, criteriaCaptor.getValue().maxSequenceNr());
+        assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
 
         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
     }
 
     @Test
-    public void testCommitBeforePersist(){
+    public void testCommitBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L, 0);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -449,8 +472,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCommitBeforeCapture(){
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+    public void testCommitBeforeCapture() {
+        snapshotManager.commit(100L, 0);
 
         verify(mockReplicatedLog, never()).snapshotCommit();
 
@@ -461,18 +484,17 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testCallingCommitMultipleTimesCausesNoHarm(){
+    public void testCallingCommitMultipleTimesCausesNoHarm() {
         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L, 0);
 
-        snapshotManager.commit(100L, mockRaftActorBehavior);
+        snapshotManager.commit(100L, 0);
 
         verify(mockReplicatedLog, times(1)).snapshotCommit();
 
@@ -482,12 +504,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRollback(){
+    public void testRollback() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -498,10 +519,9 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
 
     @Test
-    public void testRollbackBeforePersist(){
+    public void testRollbackBeforePersist() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
         snapshotManager.rollback();
 
@@ -509,19 +529,18 @@ public class SnapshotManagerTest extends AbstractActorTest {
     }
 
     @Test
-    public void testRollbackBeforeCapture(){
+    public void testRollbackBeforeCapture() {
         snapshotManager.rollback();
 
         verify(mockReplicatedLog, never()).snapshotRollback();
     }
 
     @Test
-    public void testCallingRollbackMultipleTimesCausesNoHarm(){
+    public void testCallingRollbackMultipleTimesCausesNoHarm() {
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+        snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
 
-        snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+        snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -536,14 +555,14 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
         doReturn(true).when(mockReplicatedLog).isPresent(10);
-        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+        doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", 10L, retIndex);
 
         verify(mockReplicatedLog).snapshotPreCommit(10, 5);
-        verify(mockReplicatedLog).snapshotCommit();
+        verify(mockReplicatedLog).snapshotCommit(false);
 
         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
     }
@@ -552,16 +571,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testTrimLogWhenLastAppliedNotSet() {
         doReturn(-1L).when(mockRaftActorContext).getLastApplied();
 
-        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
-        doReturn(true).when(mockReplicatedLog).isPresent(10);
-        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
-        doReturn(5L).when(replicatedLogEntry).getTerm();
-
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
-        verify(mockReplicatedLog, never()).snapshotCommit();
+        verify(mockReplicatedLog, never()).snapshotCommit(false);
 
         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
     }
@@ -570,16 +584,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testTrimLogWhenLastAppliedZero() {
         doReturn(0L).when(mockRaftActorContext).getLastApplied();
 
-        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
-        doReturn(true).when(mockReplicatedLog).isPresent(10);
-        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
-        doReturn(5L).when(replicatedLogEntry).getTerm();
-
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
-        verify(mockReplicatedLog, never()).snapshotCommit();
+        verify(mockReplicatedLog, never()).snapshotCommit(false);
 
         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
     }
@@ -590,58 +599,44 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         doReturn(false).when(mockReplicatedLog).isPresent(10);
 
-        long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+        long retIndex = snapshotManager.trimLog(10);
         assertEquals("return index", -1L, retIndex);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
-        verify(mockReplicatedLog, never()).snapshotCommit();
+        verify(mockReplicatedLog, never()).snapshotCommit(false);
 
         // Trim index is greater than replicatedToAllIndex so should update it.
         verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L);
     }
 
     @Test
-    public void testTrimLogAfterCapture(){
-        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+    public void testTrimLogAfterCapture() {
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
                 new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
-        doReturn(20L).when(mockRaftActorContext).getLastApplied();
-        doReturn(true).when(mockReplicatedLog).isPresent(10);
-        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
-        doReturn(5L).when(replicatedLogEntry).getTerm();
-
-        snapshotManager.trimLog(10, mockRaftActorBehavior);
+        snapshotManager.trimLog(10);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
-        verify(mockReplicatedLog, never()).snapshotCommit();
-
+        verify(mockReplicatedLog, never()).snapshotCommit(false);
     }
 
     @Test
-    public void testTrimLogAfterCaptureToInstall(){
-        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
-                new MockRaftActorContext.MockPayload()), 9, "follower-1");
+    public void testTrimLogAfterCaptureToInstall() {
+        boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+                new MockRaftActorContext.MockPayload()), 9);
 
         assertTrue(capture);
 
         assertEquals(true, snapshotManager.isCapturing());
 
-        ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
-        doReturn(20L).when(mockRaftActorContext).getLastApplied();
-        doReturn(true).when(mockReplicatedLog).isPresent(10);
-        doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
-        doReturn(5L).when(replicatedLogEntry).getTerm();
-
-        snapshotManager.trimLog(10, mockRaftActorBehavior);
+        snapshotManager.trimLog(10);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
         verify(mockReplicatedLog, never()).snapshotCommit();
-
     }
 
     @Test
@@ -652,7 +647,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(4L).when(mockReplicatedLog).getSnapshotTerm();
         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
 
-        ReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(6L, 9L,
+        ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 6L,
                 new MockRaftActorContext.MockPayload());
 
         // No followers and valid lastLogEntry
@@ -668,7 +663,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getIndex", -1L, reader.getIndex());
 
         // Followers and valid originalIndex entry
-        doReturn(new MockRaftActorContext.MockReplicatedLogEntry(5L, 8L,
+        doReturn(new SimpleReplicatedLogEntry(8L, 5L,
                 new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L);
         reader.init(mockReplicatedLog, 8L, lastLogEntry, true);