+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.raft;
-import static junit.framework.TestCase.assertFalse;
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.testkit.TestActorRef;
+import java.io.OutputStream;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
private RaftActorBehavior mockRaftActorBehavior;
@Mock
- private Procedure<Void> mockProcedure;
+ private Consumer<Optional<OutputStream>> mockProcedure;
+
+ @Mock
+ private ElectionTerm mockElectionTerm;
private SnapshotManager snapshotManager;
private TestActorFactory factory;
- private TestActorRef<MessageCollectorActor> actorRef;
+ private ActorRef actorRef;
@Before
- public void setUp(){
+ public void setUp() {
MockitoAnnotations.initMocks(this);
- doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(false).when(mockRaftActorContext).hasFollowers();
doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+ doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
doReturn("123").when(mockRaftActorContext).getId();
+ doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
+ doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
+ doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
+ doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn("member5").when(mockElectionTerm).getVotedFor();
+
+ doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+ .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
+
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
- actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+ actorRef = factory.createActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
+ snapshotManager.setCreateSnapshotConsumer(mockProcedure);
}
@After
- public void tearDown(){
+ public void tearDown() {
factory.close();
}
@Test
- public void testConstruction(){
+ public void testConstruction() {
assertEquals(false, snapshotManager.isCapturing());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testCaptureToInstall(){
+ public void testCaptureToInstall() {
// Force capturing toInstall = true
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1,
new MockRaftActorContext.MockPayload()), 0, "follower-1");
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", true, outputStream.getValue().isPresent());
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(0L, captureSnapshot.getLastIndex());
//
assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
- actorRef.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(actorRef);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testCapture(){
- boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ public void testCapture() {
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);
assertEquals(true, snapshotManager.isCapturing());
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
// LastIndex and LastTerm are picked up from the lastLogEntry
assertEquals(9L, captureSnapshot.getLastIndex());
assertEquals(1L, captureSnapshot.getLastTerm());
assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
- actorRef.underlyingActor().clear();
-
+ MessageCollectorActor.clearMessages(actorRef);
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testIllegalCapture() throws Exception {
- boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
- new MockRaftActorContext.MockPayload()), 9);
+ public void testCaptureWithNullLastLogEntry() {
+ boolean capture = snapshotManager.capture(null, 1);
assertTrue(capture);
- List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+ assertEquals(true, snapshotManager.isCapturing());
- assertEquals(1, allMatching.size());
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
- // This will not cause snapshot capture to start again
- capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
- new MockRaftActorContext.MockPayload()), 9);
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
- assertFalse(capture);
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(-1L, captureSnapshot.getLastIndex());
+ assertEquals(-1L, captureSnapshot.getLastTerm());
- allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(-1L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(-1L, captureSnapshot.getLastAppliedTerm());
- assertEquals(1, allMatching.size());
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+ MessageCollectorActor.clearMessages(actorRef);
}
@Test
- public void testPersistWhenReplicatedToAllIndexMinusOne(){
- doReturn("123").when(mockRaftActorContext).getId();
- doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
- doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
+ public void testCaptureWithCreateProcedureError() {
+ doThrow(new RuntimeException("mock")).when(mockProcedure).accept(any());
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
- snapshotManager.create(mockProcedure);
+ assertFalse(capture);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ assertEquals(false, snapshotManager.isCapturing());
- ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
- verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
+ verify(mockProcedure).accept(any());
+ }
- Snapshot snapshot = snapshotArgumentCaptor.getValue();
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testIllegalCapture() {
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
- assertEquals(6, snapshot.getLastAppliedTerm());
- assertEquals(9, snapshot.getLastAppliedIndex());
- assertEquals(9, snapshot.getLastIndex());
- assertEquals(6, snapshot.getLastTerm());
- assertEquals(10, snapshot.getState().length);
- assertTrue(Arrays.equals(bytes, snapshot.getState()));
- assertEquals(0, snapshot.getUnAppliedEntries().size());
+ assertTrue(capture);
- verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
- }
+ verify(mockProcedure).accept(any());
+ reset(mockProcedure);
- @Test
- public void testCreate() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
+ // This will not cause snapshot capture to start again
+ capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
- snapshotManager.create(mockProcedure);
+ assertFalse(capture);
- verify(mockProcedure).apply(null);
+ verify(mockProcedure, never()).accept(any());
}
@Test
- public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
- // when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
+ public void testPersistWhenReplicatedToAllIndexMinusOne() {
+ doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
- snapshotManager.create(mockProcedure);
+ doReturn(true).when(mockRaftActorContext).hasFollowers();
- snapshotManager.create(mockProcedure);
+ doReturn(8L).when(mockRaftActorContext).getLastApplied();
- verify(mockProcedure, times(1)).apply(null);
- }
+ ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 3L, new MockRaftActorContext.MockPayload());
- @Test
- public void testCallingCreateBeforeCapture() throws Exception {
- snapshotManager.create(mockProcedure);
+ ReplicatedLogEntry lastAppliedEntry = new SimpleReplicatedLogEntry(
+ 8L, 2L, new MockRaftActorContext.MockPayload());
- verify(mockProcedure, times(0)).apply(null);
- }
+ doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L);
+ doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L);
- @Test
- public void testCallingCreateAfterPersist() throws Exception {
// when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
+ snapshotManager.capture(lastLogEntry, -1);
- snapshotManager.create(mockProcedure);
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
- verify(mockProcedure, times(1)).apply(null);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
- reset(mockProcedure);
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
- snapshotManager.create(mockProcedure);
+ assertEquals("getLastTerm", 3L, snapshot.getLastTerm());
+ assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
+ assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
+ assertEquals("getState", snapshotState, snapshot.getState());
+ assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
+ assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
+ assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
- verify(mockProcedure, never()).apply(null);
+ verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
}
@Test
- public void testPersistWhenReplicatedToAllIndexNotMinus(){
+ public void testPersistWhenReplicatedToAllIndexNotMinus() {
doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(9L).when(replicatedLogEntry).getIndex();
// when replicatedToAllIndex != -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), 9);
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
+
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.create(mockProcedure);
+ ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
+ verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ Snapshot snapshot = snapshotArgumentCaptor.getValue();
- verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+ assertEquals("getLastTerm", 6L, snapshot.getLastTerm());
+ assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
+ assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
+ assertEquals("getState", snapshotState, snapshot.getState());
+ assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
}
-
@Test
- public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
+ public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
- snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
- new MockRaftActorContext.MockPayload()), -1);
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.create(mockProcedure);
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() {
+ doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount
+ doReturn(100).when(mockReplicatedLog).dataSize();
+
+ doReturn(5L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(5L).when(mockReplicatedLog).getSnapshotTerm();
+
+ long replicatedToAllIndex = 1;
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex);
+ doReturn(6L).when(replicatedLogEntry).getTerm();
+ doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex();
+
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
+ new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
+
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testPersistSendInstallSnapshot(){
+ public void testPersistSendInstallSnapshot() throws Exception {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+ doNothing().when(mockProcedure).accept(any());
// when replicatedToAllIndex = -1
- boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
assertTrue(capture);
- snapshotManager.create(mockProcedure);
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+
+ ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+ assertEquals("isPresent", true, installSnapshotStream.isPresent());
- snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ installSnapshotStream.get().write(snapshotState.getBytes());
+
+ snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
+
+ assertEquals(true, snapshotManager.isCapturing());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
- assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+ assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
}
@Test
- public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ public void testCallingPersistWithoutCaptureWillDoNothing() {
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
}
+
@Test
- public void testCallingPersistTwiceWillDoNoHarm(){
+ public void testCallingPersistTwiceWillDoNoHarm() {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
-
- snapshotManager.create(mockProcedure);
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
- verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
}
@Test
- public void testCommit(){
+ public void testCommit() {
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
+
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.create(mockProcedure);
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ assertEquals(true, snapshotManager.isCapturing());
+
+ snapshotManager.commit(100L, 1234L);
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ assertEquals(false, snapshotManager.isCapturing());
verify(mockReplicatedLog).snapshotCommit();
- verify(mockDataPersistenceProvider).deleteMessages(100L);
+ verify(mockDataPersistenceProvider).deleteMessages(50L);
- ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
+ ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor =
+ ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
- assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
- // config snapShotBatchCount = 10
- // therefore maxSequenceNumber = 90
+ assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
+ assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
+
+ MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
@Test
- public void testCommitBeforePersist(){
+ public void testCommitBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
}
@Test
- public void testCommitBeforeCapture(){
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ public void testCommitBeforeCapture() {
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
}
@Test
- public void testCallingCommitMultipleTimesCausesNoHarm(){
- // when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ public void testCallingCommitMultipleTimesCausesNoHarm() {
+ doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
- snapshotManager.create(mockProcedure);
+ // when replicatedToAllIndex = -1
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L, 0);
- snapshotManager.commit(mockDataPersistenceProvider, 100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, times(1)).snapshotCommit();
- verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
+ verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
}
@Test
- public void testRollback(){
+ public void testRollback() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
-
- snapshotManager.create(mockProcedure);
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
verify(mockReplicatedLog).snapshotRollback();
+
+ MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
@Test
- public void testRollbackBeforePersist(){
+ public void testRollbackBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
snapshotManager.rollback();
}
@Test
- public void testRollbackBeforeCapture(){
+ public void testRollbackBeforeCapture() {
snapshotManager.rollback();
verify(mockReplicatedLog, never()).snapshotRollback();
}
@Test
- public void testCallingRollbackMultipleTimesCausesNoHarm(){
+ public void testCallingRollbackMultipleTimesCausesNoHarm() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.create(mockProcedure);
-
- snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
- , Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
}
@Test
- public void testTrimLog(){
- ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
- ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ public void testTrimLogWhenTrimIndexLessThanLastApplied() {
doReturn(20L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
- doReturn(5L).when(mockElectionTerm).getCurrentTerm();
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
+ assertEquals("return index", 10L, retIndex);
verify(mockReplicatedLog).snapshotPreCommit(10, 5);
verify(mockReplicatedLog).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testTrimLogWhenLastAppliedNotSet() {
+ doReturn(-1L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ long retIndex = snapshotManager.trimLog(10);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
}
@Test
- public void testTrimLogAfterCapture(){
- boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+ public void testTrimLogWhenLastAppliedZero() {
+ doReturn(0L).when(mockRaftActorContext).getLastApplied();
+
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(true).when(mockReplicatedLog).isPresent(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
+ doReturn(5L).when(replicatedLogEntry).getTerm();
+
+ long retIndex = snapshotManager.trimLog(10);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testTrimLogWhenTrimIndexNotPresent() {
+ doReturn(20L).when(mockRaftActorContext).getLastApplied();
+
+ doReturn(false).when(mockReplicatedLog).isPresent(10);
+
+ long retIndex = snapshotManager.trimLog(10);
+ assertEquals("return index", -1L, retIndex);
+
+ verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
+ verify(mockReplicatedLog, never()).snapshotCommit();
+
+ // Trim index is greater than replicatedToAllIndex so should update it.
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L);
+ }
+
+ @Test
+ public void testTrimLogAfterCapture() {
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);
assertEquals(true, snapshotManager.isCapturing());
- ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(20L).when(mockRaftActorContext).getLastApplied();
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
- doReturn(5L).when(mockElectionTerm).getCurrentTerm();
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
verify(mockReplicatedLog, never()).snapshotCommit();
}
@Test
- public void testTrimLogAfterCaptureToInstall(){
- boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
- new MockRaftActorContext.MockPayload()), 9, "follower-1");
+ public void testTrimLogAfterCaptureToInstall() {
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);
assertEquals(true, snapshotManager.isCapturing());
- ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
doReturn(20L).when(mockRaftActorContext).getLastApplied();
doReturn(true).when(mockReplicatedLog).isPresent(10);
- doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
- doReturn(5L).when(mockElectionTerm).getCurrentTerm();
- doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
verify(mockReplicatedLog, never()).snapshotCommit();
}
-}
\ No newline at end of file
+ @Test
+ public void testLastAppliedTermInformationReader() {
+
+ LastAppliedTermInformationReader reader = new LastAppliedTermInformationReader();
+
+ doReturn(4L).when(mockReplicatedLog).getSnapshotTerm();
+ doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
+
+ ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 6L,
+ new MockRaftActorContext.MockPayload());
+
+ // No followers and valid lastLogEntry
+ reader.init(mockReplicatedLog, 1L, lastLogEntry, false);
+
+ assertEquals("getTerm", 6L, reader.getTerm());
+ assertEquals("getIndex", 9L, reader.getIndex());
+
+ // No followers and null lastLogEntry
+ reader.init(mockReplicatedLog, 1L, null, false);
+
+ assertEquals("getTerm", -1L, reader.getTerm());
+ assertEquals("getIndex", -1L, reader.getIndex());
+
+ // Followers and valid originalIndex entry
+ doReturn(new SimpleReplicatedLogEntry(8L, 5L,
+ new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L);
+ reader.init(mockReplicatedLog, 8L, lastLogEntry, true);
+
+ assertEquals("getTerm", 5L, reader.getTerm());
+ assertEquals("getIndex", 8L, reader.getIndex());
+
+ // Followers and null originalIndex entry and valid snapshot index
+ reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
+
+ assertEquals("getTerm", 4L, reader.getTerm());
+ assertEquals("getIndex", 7L, reader.getIndex());
+
+ // Followers and null originalIndex entry and invalid snapshot index
+ doReturn(-1L).when(mockReplicatedLog).getSnapshotIndex();
+ reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
+
+ assertEquals("getTerm", -1L, reader.getTerm());
+ assertEquals("getIndex", -1L, reader.getIndex());
+ }
+}