From: Tom Pantelis Date: Thu, 26 Mar 2015 19:22:37 +0000 (+0000) Subject: Merge "Refactor snapshot code" X-Git-Tag: release/lithium~347 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=c747a3f05a7f68f0e2e35720db8ffab6a59827fb;hp=ba29dacfce1ac56bbd26574520b67bca545d6c5d Merge "Refactor snapshot code" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java new file mode 100644 index 0000000000..cb38e82ac3 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -0,0 +1,398 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +public class SnapshotManager implements SnapshotState { + + + private final SnapshotState IDLE = new Idle(); + private final SnapshotState CAPTURING = new Capturing(); + private final SnapshotState PERSISTING = new Persisting(); + private final SnapshotState CREATING = new Creating(); + + private final Logger LOG; + private final RaftActorContext context; + private final LastAppliedTermInformationReader lastAppliedTermInformationReader = + new LastAppliedTermInformationReader(); + private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader = + new ReplicatedToAllTermInformationReader(); + + + private SnapshotState currentState = IDLE; + private CaptureSnapshot captureSnapshot; + + public SnapshotManager(RaftActorContext context, Logger logger) { + this.context = context; + this.LOG = logger; + } + + @Override + public boolean isCapturing() { + return currentState.isCapturing(); + } + + @Override + public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + currentState.captureToInstall(lastLogEntry, replicatedToAllIndex); + } + + @Override + public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + currentState.capture(lastLogEntry, replicatedToAllIndex); + } + + @Override + public void create(Procedure callback) { + currentState.create(callback); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) { + currentState.persist(persistenceProvider, snapshotBytes, currentBehavior); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + currentState.commit(persistenceProvider, sequenceNumber); + } + + @Override + public void rollback() { + currentState.rollback(); + } + + @Override + public long trimLog(long desiredTrimIndex) { + return currentState.trimLog(desiredTrimIndex); + } + + private boolean hasFollowers(){ + return context.getPeerAddresses().keySet().size() > 0; + } + + private String persistenceId(){ + return context.getId(); + } + + private class AbstractSnapshotState implements SnapshotState { + + @Override + public boolean isCapturing() { + return false; + } + + @Override + public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + LOG.debug("capture should not be called in state {}", this); + } + + @Override + public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + LOG.debug("captureToInstall should not be called in state {}", this); + } + + @Override + public void create(Procedure callback) { + LOG.debug("create should not be called in state {}", this); + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) { + LOG.debug("persist should not be called in state {}", this); + } + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + LOG.debug("commit should not be called in state {}", this); + } + + @Override + public void rollback() { + LOG.debug("rollback should not be called in state {}", this); + } + + @Override + public long trimLog(long desiredTrimIndex) { + LOG.debug("trimLog should not be called in state {}", this); + return -1; + } + + protected long doTrimLog(long desiredTrimIndex){ + // we would want to keep the lastApplied as its used while capturing snapshots + long lastApplied = context.getLastApplied(); + long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1)); + + if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) { + LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin, + context.getTermInformation().getCurrentTerm()); + + //use the term of the temp-min, since we check for isPresent, entry will not be null + ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin); + context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm()); + context.getReplicatedLog().snapshotCommit(); + return tempMin; + } + + return -1; + } + } + + private class Idle extends AbstractSnapshotState { + + private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) { + TermInformationReader lastAppliedTermInfoReader = + lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), + lastLogEntry, hasFollowers()); + + long lastAppliedIndex = lastAppliedTermInfoReader.getIndex(); + long lastAppliedTerm = lastAppliedTermInfoReader.getTerm(); + + TermInformationReader replicatedToAllTermInfoReader = + replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex); + + long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex(); + long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm(); + + // send a CaptureSnapshot to self to make the expensive operation async. + captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(), + lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm, + newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall); + + SnapshotManager.this.currentState = CAPTURING; + + LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "", + captureSnapshot); + + context.getActor().tell(captureSnapshot, context.getActor()); + } + + @Override + public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + capture(lastLogEntry, replicatedToAllIndex, false); + } + + @Override + public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) { + capture(lastLogEntry, replicatedToAllIndex, true); + } + + @Override + public String toString() { + return "Idle"; + } + + @Override + public long trimLog(long desiredTrimIndex) { + return doTrimLog(desiredTrimIndex); + } + } + + private class Capturing extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void create(Procedure callback) { + try { + callback.apply(null); + SnapshotManager.this.currentState = CREATING; + } catch (Exception e) { + LOG.error("Unexpected error occurred", e); + } + } + + @Override + public String toString() { + return "Capturing"; + } + + } + + private class Creating extends AbstractSnapshotState { + + @Override + public boolean isCapturing() { + return true; + } + + @Override + public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, + RaftActorBehavior currentBehavior) { + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + + Snapshot sn = Snapshot.create(snapshotBytes, + context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1), + captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), + captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + + persistenceProvider.saveSnapshot(sn); + + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); + + long dataThreshold = Runtime.getRuntime().totalMemory() * + context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; + if (context.getReplicatedLog().dataSize() > dataThreshold) { + // if memory is less, clear the log based on lastApplied. + // this could/should only happen if one of the followers is down + // as normally we keep removing from the log when its replicated to all. + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ + // clear the log based on replicatedToAllIndex + context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), + captureSnapshot.getReplicatedToAllTerm()); + + currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } else { + // The replicatedToAllIndex was not found in the log + // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot. + // In this scenario we may need to save the snapshot to the akka persistence + // snapshot for recovery but we do not need to do the replicated log trimming. + context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); + } + + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + + "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), + captureSnapshot.getLastAppliedTerm()); + + if (context.getId().equals(currentBehavior.getLeaderId()) + && captureSnapshot.isInstallSnapshotInitiated()) { + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot( + ByteString.copyFrom(snapshotBytes))); + } + + captureSnapshot = null; + SnapshotManager.this.currentState = PERSISTING; + } + + @Override + public String toString() { + return "Creating"; + } + + } + + private class Persisting extends AbstractSnapshotState { + + @Override + public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria( + sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); + + persistenceProvider.deleteMessages(sequenceNumber); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public void rollback() { + context.getReplicatedLog().snapshotRollback(); + + LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." + + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().size()); + + SnapshotManager.this.currentState = IDLE; + } + + @Override + public String toString() { + return "Persisting"; + } + + } + + private static interface TermInformationReader { + long getIndex(); + long getTerm(); + } + + private static class LastAppliedTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex, + ReplicatedLogEntry lastLogEntry, boolean hasFollowers){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + if (!hasFollowers) { + if(lastLogEntry != null) { + index = lastLogEntry.getIndex(); + term = lastLogEntry.getTerm(); + } + } else if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } else if(originalIndex == log.getSnapshotIndex()){ + index = log.getSnapshotIndex(); + term = log.getSnapshotTerm(); + } + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } + + private static class ReplicatedToAllTermInformationReader implements TermInformationReader{ + private long index; + private long term; + + ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){ + ReplicatedLogEntry entry = log.get(originalIndex); + this.index = -1L; + this.term = -1L; + + if (entry != null) { + index = entry.getIndex(); + term = entry.getTerm(); + } + + return this; + } + + @Override + public long getIndex(){ + return this.index; + } + + @Override + public long getTerm(){ + return this.term; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java new file mode 100644 index 0000000000..2ff30ec53b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.japi.Procedure; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; + +public interface SnapshotState { + /** + * Should return true when a snapshot is being captured + * @return + */ + boolean isCapturing(); + + /** + * Initiate capture snapshot + * + * @param lastLogEntry the last entry in the replicated log + * @param replicatedToAllIndex the current replicatedToAllIndex + */ + void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + + /** + * Initiate capture snapshot for the purposing of installing that snapshot + * + * @param lastLogEntry + * @param replicatedToAllIndex + */ + void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex); + + /** + * Create the snapshot + * + * @param callback a procedure to be called which should create the snapshot + */ + void create(Procedure callback); + + /** + * Persist the snapshot + * + * @param persistenceProvider + * @param snapshotBytes + * @param currentBehavior + */ + void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior); + + /** + * Commit the snapshot by trimming the log + * + * @param persistenceProvider + * @param sequenceNumber + */ + void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber); + + /** + * Rollback the snapshot + */ + void rollback(); + + /** + * Trim the log + * + * @param desiredTrimIndex + * @return the actual trim index + */ + long trimLog(long desiredTrimIndex); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java new file mode 100644 index 0000000000..90272fec98 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -0,0 +1,528 @@ +package org.opendaylight.controller.cluster.raft; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.japi.Procedure; +import akka.persistence.SnapshotSelectionCriteria; +import akka.testkit.TestActorRef; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.LoggerFactory; + +public class SnapshotManagerTest extends AbstractActorTest { + + @Mock + private RaftActorContext mockRaftActorContext; + + @Mock + private ConfigParams mockConfigParams; + + @Mock + private ReplicatedLog mockReplicatedLog; + + @Mock + private DataPersistenceProvider mockDataPersistenceProvider; + + @Mock + private RaftActorBehavior mockRaftActorBehavior; + + @Mock + private Procedure mockProcedure; + + private SnapshotManager snapshotManager; + + private TestActorFactory factory; + + private TestActorRef actorRef; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses(); + doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams(); + doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); + doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); + doReturn("123").when(mockRaftActorContext).getId(); + doReturn("123").when(mockRaftActorBehavior).getLeaderId(); + + snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); + factory = new TestActorFactory(getSystem()); + + actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); + doReturn(actorRef).when(mockRaftActorContext).getActor(); + + } + + @After + public void tearDown(){ + factory.close(); + } + + @Test + public void testConstruction(){ + assertEquals(false, snapshotManager.isCapturing()); + } + + @Test + public void testCaptureToInstall(){ + + // Force capturing toInstall = true + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + new MockRaftActorContext.MockPayload()), 0); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(0L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(0L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + actorRef.underlyingActor().clear(); + } + + @Test + public void testCapture(){ + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertEquals(true, snapshotManager.isCapturing()); + + CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class); + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(9L, captureSnapshot.getLastIndex()); + assertEquals(1L, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(9L, captureSnapshot.getLastAppliedIndex()); + assertEquals(1L, captureSnapshot.getLastAppliedTerm()); + + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + + actorRef.underlyingActor().clear(); + + } + + @Test + public void testIllegalCapture() throws Exception { + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + List allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + + // This will not cause snapshot capture to start again + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class); + + assertEquals(1, allMatching.size()); + } + + @Test + public void testPersistWhenReplicatedToAllIndexMinusOne(){ + doReturn("123").when(mockRaftActorContext).getId(); + doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior); + + ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); + verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); + + Snapshot snapshot = snapshotArgumentCaptor.getValue(); + + assertEquals(6, snapshot.getLastAppliedTerm()); + assertEquals(9, snapshot.getLastAppliedIndex()); + assertEquals(9, snapshot.getLastIndex()); + assertEquals(6, snapshot.getLastTerm()); + assertEquals(10, snapshot.getState().length); + assertTrue(Arrays.equals(bytes, snapshot.getState())); + assertEquals(0, snapshot.getUnAppliedEntries().size()); + + verify(mockReplicatedLog).snapshotPreCommit(45L, 6L); + } + + + @Test + public void testCreate() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure).apply(null); + } + + @Test + public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + } + + @Test + public void testCallingCreateBeforeCapture() throws Exception { + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(0)).apply(null); + } + + @Test + public void testCallingCreateAfterPersist() throws Exception { + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, times(1)).apply(null); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + Mockito.reset(mockProcedure); + + snapshotManager.create(mockProcedure); + + verify(mockProcedure, never()).apply(null); + } + + @Test + public void testPersistWhenReplicatedToAllIndexNotMinus(){ + doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9); + doReturn(6L).when(replicatedLogEntry).getTerm(); + doReturn(9L).when(replicatedLogEntry).getIndex(); + + // when replicatedToAllIndex != -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), 9); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(9); + } + + + @Test + public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1); + } + + @Test + public void testPersistSendInstallSnapshot(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + + snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + ArgumentCaptor sendInstallSnapshotArgumentCaptor + = ArgumentCaptor.forClass(SendInstallSnapshot.class); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture()); + + SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); + + assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray())); + } + + @Test + public void testCallingPersistWithoutCaptureWillDoNothing(){ + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + @Test + public void testCallingPersistTwiceWillDoNoHarm(){ + doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); + } + + @Test + public void testCommit(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog).snapshotCommit(); + + verify(mockDataPersistenceProvider).deleteMessages(100L); + + ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); + + verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture()); + + assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100 + // config snapShotBatchCount = 10 + // therefore maxSequenceNumber = 90 + } + + @Test + public void testCommitBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(100L); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCommitBeforeCapture(){ + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, never()).snapshotCommit(); + + verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong()); + + verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + + } + + @Test + public void testCallingCommitMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + snapshotManager.commit(mockDataPersistenceProvider, 100L); + + verify(mockReplicatedLog, times(1)).snapshotCommit(); + + verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L); + + verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class)); + } + + @Test + public void testRollback(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + snapshotManager.rollback(); + + verify(mockReplicatedLog).snapshotRollback(); + } + + + @Test + public void testRollbackBeforePersist(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testRollbackBeforeCapture(){ + snapshotManager.rollback(); + + verify(mockReplicatedLog, never()).snapshotRollback(); + } + + @Test + public void testCallingRollbackMultipleTimesCausesNoHarm(){ + // when replicatedToAllIndex = -1 + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.create(mockProcedure); + + snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior); + + snapshotManager.rollback(); + + snapshotManager.rollback(); + + verify(mockReplicatedLog, times(1)).snapshotRollback(); + } + + @Test + public void testTrimLog(){ + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10); + + verify(mockReplicatedLog).snapshotPreCommit(10, 5); + verify(mockReplicatedLog).snapshotCommit(); + } + + @Test + public void testTrimLogAfterCapture(){ + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertEquals(true, snapshotManager.isCapturing()); + + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10); + + verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + + @Test + public void testTrimLogAfterCaptureToInstall(){ + snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + new MockRaftActorContext.MockPayload()), 9); + + assertEquals(true, snapshotManager.isCapturing()); + + ElectionTerm mockElectionTerm = mock(ElectionTerm.class); + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(20L).when(mockRaftActorContext).getLastApplied(); + doReturn(true).when(mockReplicatedLog).isPresent(10); + doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); + doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(5L).when(replicatedLogEntry).getTerm(); + + snapshotManager.trimLog(10); + + verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); + verify(mockReplicatedLog, never()).snapshotCommit(); + + } + +} \ No newline at end of file