X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FSnapshotManagerTest.java;h=4cdec8ca672c87fb091f176185e18837c52e0a4f;hp=89aadbe0ae0fb4dc0ba7646e24942a7323be0376;hb=ff29db5dc6012f77bbe53f57ddce929b0de093b3;hpb=c9ea87d45243f611570839ce6cf588c8e130f864 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 89aadbe0ae..4cdec8ca67 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -1,11 +1,20 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -13,13 +22,13 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; -import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; -import akka.testkit.TestActorRef; -import com.google.common.collect.ImmutableMap; +import java.io.OutputStream; import java.util.Arrays; -import java.util.HashMap; +import java.util.Optional; +import java.util.function.Consumer; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -27,10 +36,15 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.LoggerFactory; @@ -52,59 +66,70 @@ public class SnapshotManagerTest extends AbstractActorTest { private RaftActorBehavior mockRaftActorBehavior; @Mock - private Procedure mockProcedure; + private Consumer> mockProcedure; + + @Mock + private ElectionTerm mockElectionTerm; private SnapshotManager snapshotManager; private TestActorFactory factory; - private TestActorRef actorRef; + private ActorRef actorRef; @Before - public void setUp(){ + public void setUp() { MockitoAnnotations.initMocks(this); - doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses(); + doReturn(false).when(mockRaftActorContext).hasFollowers(); doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams(); doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); + doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage(); doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); doReturn("123").when(mockRaftActorContext).getId(); doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider(); + doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior(); doReturn("123").when(mockRaftActorBehavior).getLeaderId(); - ElectionTerm mockElectionTerm = mock(ElectionTerm.class); doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation(); doReturn(5L).when(mockElectionTerm).getCurrentTerm(); + doReturn("member5").when(mockElectionTerm).getVotedFor(); + + doReturn(new FileBackedOutputStreamFactory(10000000, "target")) + .when(mockRaftActorContext).getFileBackedOutputStreamFactory(); snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass())); factory = new TestActorFactory(getSystem()); - actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-")); + actorRef = factory.createActor(MessageCollectorActor.props(), factory.generateActorId("test-")); doReturn(actorRef).when(mockRaftActorContext).getActor(); - snapshotManager.setCreateSnapshotCallable(mockProcedure); + snapshotManager.setCreateSnapshotConsumer(mockProcedure); } @After - public void tearDown(){ + public void tearDown() { factory.close(); } @Test - public void testConstruction(){ + public void testConstruction() { assertEquals(false, snapshotManager.isCapturing()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test - public void testCaptureToInstall() throws Exception { + public void testCaptureToInstall() { // Force capturing toInstall = true - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, + snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload()), 0, "follower-1"); assertEquals(true, snapshotManager.isCapturing()); - verify(mockProcedure).apply(null); + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", true, outputStream.getValue().isPresent()); CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); @@ -119,19 +144,22 @@ public class SnapshotManagerTest extends AbstractActorTest { // assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); - actorRef.underlyingActor().clear(); + MessageCollectorActor.clearMessages(actorRef); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testCapture() throws Exception { - boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + public void testCapture() { + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); assertTrue(capture); assertEquals(true, snapshotManager.isCapturing()); - verify(mockProcedure).apply(null); + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", false, outputStream.getValue().isPresent()); CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); @@ -147,58 +175,86 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); - actorRef.underlyingActor().clear(); + MessageCollectorActor.clearMessages(actorRef); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testCaptureWithNullLastLogEntry() { + boolean capture = snapshotManager.capture(null, 1); + + assertTrue(capture); + + assertEquals(true, snapshotManager.isCapturing()); + + ArgumentCaptor outputStream = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(outputStream.capture()); + assertEquals("isPresent", false, outputStream.getValue().isPresent()); + + CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot(); + + // LastIndex and LastTerm are picked up from the lastLogEntry + assertEquals(0, captureSnapshot.getLastIndex()); + assertEquals(0, captureSnapshot.getLastTerm()); + + // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry + assertEquals(0, captureSnapshot.getLastAppliedIndex()); + assertEquals(0, captureSnapshot.getLastAppliedTerm()); + // + assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex()); + assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm()); + MessageCollectorActor.clearMessages(actorRef); } @Test - public void testCaptureWithCreateProcedureError () throws Exception { - doThrow(new Exception("mock")).when(mockProcedure).apply(null); + public void testCaptureWithCreateProcedureError() { + doThrow(new RuntimeException("mock")).when(mockProcedure).accept(any()); - boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); assertFalse(capture); assertEquals(false, snapshotManager.isCapturing()); - verify(mockProcedure).apply(null); + verify(mockProcedure).accept(any()); } + @SuppressWarnings("unchecked") @Test - public void testIllegalCapture() throws Exception { - boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + public void testIllegalCapture() { + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); assertTrue(capture); - verify(mockProcedure).apply(null); + verify(mockProcedure).accept(any()); reset(mockProcedure); // This will not cause snapshot capture to start again - capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); assertFalse(capture); - verify(mockProcedure, never()).apply(null); + verify(mockProcedure, never()).accept(any()); } @Test - public void testPersistWhenReplicatedToAllIndexMinusOne(){ + public void testPersistWhenReplicatedToAllIndexMinusOne() { doReturn(7L).when(mockReplicatedLog).getSnapshotIndex(); doReturn(1L).when(mockReplicatedLog).getSnapshotTerm(); - doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses(); + doReturn(true).when(mockRaftActorContext).hasFollowers(); doReturn(8L).when(mockRaftActorContext).getLastApplied(); - MockRaftActorContext.MockReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 3L, 9L, new MockRaftActorContext.MockPayload()); + ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 3L, new MockRaftActorContext.MockPayload()); - MockRaftActorContext.MockReplicatedLogEntry lastAppliedEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 2L, 8L, new MockRaftActorContext.MockPayload()); + ReplicatedLogEntry lastAppliedEntry = new SimpleReplicatedLogEntry( + 8L, 2L, new MockRaftActorContext.MockPayload()); doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L); doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L); @@ -206,8 +262,8 @@ public class SnapshotManagerTest extends AbstractActorTest { // when replicatedToAllIndex = -1 snapshotManager.capture(lastLogEntry, -1); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); + snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -218,14 +274,16 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm()); assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex()); - assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getState", snapshotState, snapshot.getState()); assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries()); + assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm()); + assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor()); verify(mockReplicatedLog).snapshotPreCommit(7L, 1L); } @Test - public void testPersistWhenReplicatedToAllIndexNotMinus(){ + public void testPersistWhenReplicatedToAllIndexNotMinus() { doReturn(45L).when(mockReplicatedLog).getSnapshotIndex(); doReturn(6L).when(mockReplicatedLog).getSnapshotTerm(); ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); @@ -234,11 +292,10 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(9L).when(replicatedLogEntry).getIndex(); // when replicatedToAllIndex != -1 - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, - new MockRaftActorContext.MockPayload()), 9); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); + snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory()); ArgumentCaptor snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class); verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture()); @@ -249,7 +306,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getLastIndex", 9L, snapshot.getLastIndex()); assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm()); assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex()); - assertArrayEquals("getState", bytes, snapshot.getState()); + assertEquals("getState", snapshotState, snapshot.getState()); assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size()); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); @@ -257,35 +314,73 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockRaftActorBehavior).setReplicatedToAllIndex(9); } - @Test - public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){ + public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); // when replicatedToAllIndex = -1 - snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9, - new MockRaftActorContext.MockPayload()), -1); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); + } + + @Test + public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() { + doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount + doReturn(100).when(mockReplicatedLog).dataSize(); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + doReturn(5L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(5L).when(mockReplicatedLog).getSnapshotTerm(); + + long replicatedToAllIndex = 1; + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex); + doReturn(6L).when(replicatedLogEntry).getTerm(); + doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex(); + + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, + new MockRaftActorContext.MockPayload()), replicatedToAllIndex); + + snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testPersistSendInstallSnapshot(){ + public void testPersistSendInstallSnapshot() throws Exception { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); + doNothing().when(mockProcedure).accept(any()); // when replicatedToAllIndex = -1 - boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1, "follower-1"); assertTrue(capture); - byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10}; + ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10}); + + ArgumentCaptor installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class); + verify(mockProcedure).accept(installSnapshotStreamCapture.capture()); + + Optional installSnapshotStream = installSnapshotStreamCapture.getValue(); + assertEquals("isPresent", true, installSnapshotStream.isPresent()); + + installSnapshotStream.get().write(snapshotState.getBytes()); - snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory()); + + assertEquals(true, snapshotManager.isCapturing()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); @@ -298,12 +393,13 @@ public class SnapshotManagerTest extends AbstractActorTest { SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); - assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray())); + assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState()); + assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read()); } @Test - public void testCallingPersistWithoutCaptureWillDoNothing(){ - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + public void testCallingPersistWithoutCaptureWillDoNothing() { + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class)); @@ -311,57 +407,59 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); } + @Test - public void testCallingPersistTwiceWillDoNoHarm(){ + public void testCallingPersistTwiceWillDoNoHarm() { doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); - - verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class)); } @Test - public void testCommit(){ + public void testCommit() { doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); + + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); + + assertEquals(true, snapshotManager.isCapturing()); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.commit(100L, 1234L); - snapshotManager.commit(100L); + assertEquals(false, snapshotManager.isCapturing()); verify(mockReplicatedLog).snapshotCommit(); verify(mockDataPersistenceProvider).deleteMessages(50L); - ArgumentCaptor criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); + ArgumentCaptor criteriaCaptor = + ArgumentCaptor.forClass(SnapshotSelectionCriteria.class); verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture()); - assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100 - // config snapShotBatchCount = 10 - // therefore maxSequenceNumber = 90 + assertEquals(Long.MAX_VALUE, criteriaCaptor.getValue().maxSequenceNr()); + assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp()); + + MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class); } @Test - public void testCommitBeforePersist(){ + public void testCommitBeforePersist() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.commit(100L); + snapshotManager.commit(100L, 0); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -372,8 +470,8 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testCommitBeforeCapture(){ - snapshotManager.commit(100L); + public void testCommitBeforeCapture() { + snapshotManager.commit(100L, 0); verify(mockReplicatedLog, never()).snapshotCommit(); @@ -384,18 +482,17 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testCallingCommitMultipleTimesCausesNoHarm(){ + public void testCallingCommitMultipleTimesCausesNoHarm() { doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber(); // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); - snapshotManager.commit(100L); + snapshotManager.commit(100L, 0); - snapshotManager.commit(100L); + snapshotManager.commit(100L, 0); verify(mockReplicatedLog, times(1)).snapshotCommit(); @@ -405,24 +502,24 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testRollback(){ + public void testRollback() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); verify(mockReplicatedLog).snapshotRollback(); + + MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class); } @Test - public void testRollbackBeforePersist(){ + public void testRollbackBeforePersist() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); snapshotManager.rollback(); @@ -430,19 +527,18 @@ public class SnapshotManagerTest extends AbstractActorTest { } @Test - public void testRollbackBeforeCapture(){ + public void testRollbackBeforeCapture() { snapshotManager.rollback(); verify(mockReplicatedLog, never()).snapshotRollback(); } @Test - public void testCallingRollbackMultipleTimesCausesNoHarm(){ + public void testCallingRollbackMultipleTimesCausesNoHarm() { // when replicatedToAllIndex = -1 - snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, - new MockRaftActorContext.MockPayload()), -1, "follower-1"); + snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1); - snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory()); snapshotManager.rollback(); @@ -457,14 +553,14 @@ public class SnapshotManagerTest extends AbstractActorTest { ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); doReturn(true).when(mockReplicatedLog).isPresent(10); - doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", 10L, retIndex); verify(mockReplicatedLog).snapshotPreCommit(10, 5); - verify(mockReplicatedLog).snapshotCommit(); + verify(mockReplicatedLog).snapshotCommit(false); verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); } @@ -475,14 +571,14 @@ public class SnapshotManagerTest extends AbstractActorTest { ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); doReturn(true).when(mockReplicatedLog).isPresent(10); - doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); - verify(mockReplicatedLog, never()).snapshotCommit(); + verify(mockReplicatedLog, never()).snapshotCommit(false); verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); } @@ -493,14 +589,14 @@ public class SnapshotManagerTest extends AbstractActorTest { ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); doReturn(true).when(mockReplicatedLog).isPresent(10); - doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); - verify(mockReplicatedLog, never()).snapshotCommit(); + verify(mockReplicatedLog, never()).snapshotCommit(false); verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); } @@ -511,19 +607,19 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(false).when(mockReplicatedLog).isPresent(10); - long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior); + long retIndex = snapshotManager.trimLog(10); assertEquals("return index", -1L, retIndex); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); - verify(mockReplicatedLog, never()).snapshotCommit(); + verify(mockReplicatedLog, never()).snapshotCommit(false); // Trim index is greater than replicatedToAllIndex so should update it. verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L); } @Test - public void testTrimLogAfterCapture(){ - boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9, + public void testTrimLogAfterCapture() { + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, new MockRaftActorContext.MockPayload()), 9); assertTrue(capture); @@ -533,20 +629,19 @@ public class SnapshotManagerTest extends AbstractActorTest { ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); doReturn(20L).when(mockRaftActorContext).getLastApplied(); doReturn(true).when(mockReplicatedLog).isPresent(10); - doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10, mockRaftActorBehavior); + snapshotManager.trimLog(10); verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong()); - verify(mockReplicatedLog, never()).snapshotCommit(); - + verify(mockReplicatedLog, never()).snapshotCommit(false); } @Test - public void testTrimLogAfterCaptureToInstall(){ - boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9, - new MockRaftActorContext.MockPayload()), 9, "follower-1"); + public void testTrimLogAfterCaptureToInstall() { + boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1, + new MockRaftActorContext.MockPayload()), 9); assertTrue(capture); @@ -555,14 +650,13 @@ public class SnapshotManagerTest extends AbstractActorTest { ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); doReturn(20L).when(mockRaftActorContext).getLastApplied(); doReturn(true).when(mockReplicatedLog).isPresent(10); - doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10); doReturn(5L).when(replicatedLogEntry).getTerm(); - snapshotManager.trimLog(10, mockRaftActorBehavior); + snapshotManager.trimLog(10); verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5); verify(mockReplicatedLog, never()).snapshotCommit(); - } @Test @@ -573,7 +667,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(4L).when(mockReplicatedLog).getSnapshotTerm(); doReturn(7L).when(mockReplicatedLog).getSnapshotIndex(); - ReplicatedLogEntry lastLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(6L, 9L, + ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 6L, new MockRaftActorContext.MockPayload()); // No followers and valid lastLogEntry @@ -589,7 +683,7 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getIndex", -1L, reader.getIndex()); // Followers and valid originalIndex entry - doReturn(new MockRaftActorContext.MockReplicatedLogEntry(5L, 8L, + doReturn(new SimpleReplicatedLogEntry(8L, 5L, new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L); reader.init(mockReplicatedLog, 8L, lastLogEntry, true); @@ -609,4 +703,4 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getTerm", -1L, reader.getTerm()); assertEquals("getIndex", -1L, reader.getIndex()); } -} \ No newline at end of file +}