import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.testkit.TestActorRef;
+import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
private RaftActorBehavior mockRaftActorBehavior;
@Mock
- private Runnable mockProcedure;
+ private Consumer<Optional<OutputStream>> mockProcedure;
@Mock
private ElectionTerm mockElectionTerm;
private TestActorFactory factory;
- private TestActorRef<MessageCollectorActor> actorRef;
+ private ActorRef actorRef;
@Before
public void setUp() {
doReturn(5L).when(mockElectionTerm).getCurrentTerm();
doReturn("member5").when(mockElectionTerm).getVotedFor();
+ doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
+ .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
+
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
- actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
+ actorRef = factory.createActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
- snapshotManager.setCreateSnapshotRunnable(mockProcedure);
+ snapshotManager.setCreateSnapshotConsumer(mockProcedure);
}
@After
assertEquals(false, snapshotManager.isCapturing());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testCaptureToInstall() throws Exception {
+ public void testCaptureToInstall() {
// Force capturing toInstall = true
snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1,
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", true, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
//
assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
- actorRef.underlyingActor().clear();
+ MessageCollectorActor.clearMessages(actorRef);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testCapture() throws Exception {
+ public void testCapture() {
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
- actorRef.underlyingActor().clear();
-
+ MessageCollectorActor.clearMessages(actorRef);
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testCaptureWithNullLastLogEntry() throws Exception {
+ public void testCaptureWithNullLastLogEntry() {
boolean capture = snapshotManager.capture(null, 1);
assertTrue(capture);
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
//
assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
- actorRef.underlyingActor().clear();
-
+ MessageCollectorActor.clearMessages(actorRef);
}
@Test
- public void testCaptureWithCreateProcedureError() throws Exception {
- doThrow(new RuntimeException("mock")).when(mockProcedure).run();
+ public void testCaptureWithCreateProcedureError() {
+ doThrow(new RuntimeException("mock")).when(mockProcedure).accept(any());
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(false, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ verify(mockProcedure).accept(any());
}
+ @SuppressWarnings("unchecked")
@Test
- public void testIllegalCapture() throws Exception {
+ public void testIllegalCapture() {
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);
- verify(mockProcedure).run();
+ verify(mockProcedure).accept(any());
reset(mockProcedure);
assertFalse(capture);
- verify(mockProcedure, never()).run();
+ verify(mockProcedure, never()).accept(any());
}
@Test
// when replicatedToAllIndex = -1
snapshotManager.capture(lastLogEntry, -1);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
- assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getState", snapshotState, snapshot.getState());
assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
// when replicatedToAllIndex != -1
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
- assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getState", snapshotState, snapshot.getState());
assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
// when replicatedToAllIndex = -1
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
- snapshotManager.persist(new byte[]{}, 2000000L);
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testPersistSendInstallSnapshot() {
+ public void testPersistSendInstallSnapshot() throws Exception {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+ doNothing().when(mockProcedure).accept(any());
// when replicatedToAllIndex = -1
boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
assertTrue(capture);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+
+ ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
+
+ Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+ assertEquals("isPresent", true, installSnapshotStream.isPresent());
+
+ installSnapshotStream.get().write(snapshotState.getBytes());
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
- assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
+ assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+ assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
}
@Test
public void testCallingPersistWithoutCaptureWillDoNothing() {
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
- verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
}
@Test
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
@Test
public void testCommitBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
snapshotManager.commit(100L, 0);
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.commit(100L, 0);
@Test
public void testRollback() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
@Test
public void testRollbackBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
snapshotManager.rollback();
@Test
public void testCallingRollbackMultipleTimesCausesNoHarm() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
@Test
public void testTrimLogAfterCaptureToInstall() {
- boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 1,
- new MockRaftActorContext.MockPayload()), 9, "follower-1");
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);