import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
+import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
private RaftActorBehavior mockRaftActorBehavior;
@Mock
- private Runnable mockProcedure;
+ private Consumer<Optional<OutputStream>> mockProcedure;
@Mock
private ElectionTerm mockElectionTerm;
actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
- snapshotManager.setCreateSnapshotRunnable(mockProcedure);
+ snapshotManager.setCreateSnapshotConsumer(mockProcedure);
}
@After
assertEquals(false, snapshotManager.isCapturing());
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testCaptureToInstall() throws Exception {
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", true, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
actorRef.underlyingActor().clear();
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testCapture() throws Exception {
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testCaptureWithNullLastLogEntry() throws Exception {
boolean capture = snapshotManager.capture(null, 1);
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(outputStream.capture());
+ assertEquals("isPresent", false, outputStream.getValue().isPresent());
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
@Test
public void testCaptureWithCreateProcedureError() throws Exception {
- doThrow(new RuntimeException("mock")).when(mockProcedure).run();
+ doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject());
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(false, snapshotManager.isCapturing());
- verify(mockProcedure).run();
+ verify(mockProcedure).accept(anyObject());
}
+ @SuppressWarnings("unchecked")
@Test
public void testIllegalCapture() throws Exception {
boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
assertTrue(capture);
- verify(mockProcedure).run();
+ verify(mockProcedure).accept(anyObject());
reset(mockProcedure);
assertFalse(capture);
- verify(mockProcedure, never()).run();
+ verify(mockProcedure, never()).accept(anyObject());
}
@Test
- public void testPersistWhenReplicatedToAllIndexMinusOne() {
+ public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception {
doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
// when replicatedToAllIndex = -1
snapshotManager.capture(lastLogEntry, -1);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
- assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getState", snapshotState, snapshot.getState());
assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
}
@Test
- public void testPersistWhenReplicatedToAllIndexNotMinus() {
+ public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception {
doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
// when replicatedToAllIndex != -1
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
+ snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
- assertArrayEquals("getState", bytes, snapshot.getState());
+ assertEquals("getState", snapshotState, snapshot.getState());
assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
// when replicatedToAllIndex = -1
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
- snapshotManager.persist(new byte[]{}, 2000000L);
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testPersistSendInstallSnapshot() {
+ public void testPersistSendInstallSnapshot() throws Exception {
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
+ doNothing().when(mockProcedure).accept(anyObject());
// when replicatedToAllIndex = -1
boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
assertTrue(capture);
- byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
+ ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
- snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+ ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
+ verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
+
+ Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
+ assertEquals("isPresent", true, installSnapshotStream.isPresent());
+
+ installSnapshotStream.get().write(snapshotState.getBytes());
+
+ snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
- assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
+ assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
+ assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
}
@Test
public void testCallingPersistWithoutCaptureWillDoNothing() {
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
- verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
}
@Test
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
assertEquals(true, snapshotManager.isCapturing());
@Test
public void testCommitBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
snapshotManager.commit(100L, 0);
doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.commit(100L, 0);
@Test
public void testRollback() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
@Test
public void testRollbackBeforePersist() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
snapshotManager.rollback();
@Test
public void testCallingRollbackMultipleTimesCausesNoHarm() {
// when replicatedToAllIndex = -1
- snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
- new MockRaftActorContext.MockPayload()), -1, "follower-1");
+ snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
@Test
public void testTrimLogAfterCaptureToInstall() {
- boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 1,
- new MockRaftActorContext.MockPayload()), 9, "follower-1");
+ boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
+ new MockRaftActorContext.MockPayload()), 9);
assertTrue(capture);