+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
-import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.TestActorRef;
-import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
-import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
private RaftActorBehavior mockRaftActorBehavior;
@Mock
- private Procedure<Void> mockProcedure;
+ private Runnable mockProcedure;
+
+ @Mock
+ private ElectionTerm mockElectionTerm;
private SnapshotManager snapshotManager;
public void setUp(){
MockitoAnnotations.initMocks(this);
- doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(false).when(mockRaftActorContext).hasFollowers();
doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
+ doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
doReturn("123").when(mockRaftActorContext).getId();
doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
+ doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
doReturn("123").when(mockRaftActorBehavior).getLeaderId();
- ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
doReturn(5L).when(mockElectionTerm).getCurrentTerm();
+ doReturn("member5").when(mockElectionTerm).getVotedFor();
snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
factory = new TestActorFactory(getSystem());
actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
doReturn(actorRef).when(mockRaftActorContext).getActor();
- snapshotManager.setCreateSnapshotCallable(mockProcedure);
+ snapshotManager.setCreateSnapshotRunnable(mockProcedure);
}
@After
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
assertEquals(true, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
}
+ @Test
+ public void testCaptureWithNullLastLogEntry() throws Exception {
+ boolean capture = snapshotManager.capture(null, 1);
+
+ assertTrue(capture);
+
+ assertEquals(true, snapshotManager.isCapturing());
+
+ verify(mockProcedure).run();
+
+ CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
+
+ System.out.println(captureSnapshot);
+
+ // LastIndex and LastTerm are picked up from the lastLogEntry
+ assertEquals(-1L, captureSnapshot.getLastIndex());
+ assertEquals(-1L, captureSnapshot.getLastTerm());
+
+ // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
+ assertEquals(-1L, captureSnapshot.getLastAppliedIndex());
+ assertEquals(-1L, captureSnapshot.getLastAppliedTerm());
+
+ //
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
+ assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
+ actorRef.underlyingActor().clear();
+
+ }
+
@Test
public void testCaptureWithCreateProcedureError () throws Exception {
- doThrow(new Exception("mock")).when(mockProcedure).apply(null);
+ doThrow(new RuntimeException("mock")).when(mockProcedure).run();
boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
new MockRaftActorContext.MockPayload()), 9);
assertEquals(false, snapshotManager.isCapturing());
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
}
@Test
assertTrue(capture);
- verify(mockProcedure).apply(null);
+ verify(mockProcedure).run();
reset(mockProcedure);
assertFalse(capture);
- verify(mockProcedure, never()).apply(null);
+ verify(mockProcedure, never()).run();
}
@Test
doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
- doReturn(ImmutableMap.builder().put("follower-1", "").build()).when(mockRaftActorContext).getPeerAddresses();
+ doReturn(true).when(mockRaftActorContext).hasFollowers();
doReturn(8L).when(mockRaftActorContext).getLastApplied();
snapshotManager.capture(lastLogEntry, -1);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
assertArrayEquals("getState", bytes, snapshot.getState());
assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
+ assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
+ assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
}
new MockRaftActorContext.MockPayload()), 9);
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
}
-
@Test
public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
new MockRaftActorContext.MockPayload()), -1);
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
+ }
+
+ @Test
+ public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() {
+ doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount
+ doReturn(100).when(mockReplicatedLog).dataSize();
+
+ doReturn(5L).when(mockReplicatedLog).getSnapshotIndex();
+ doReturn(5L).when(mockReplicatedLog).getSnapshotTerm();
+
+ long replicatedToAllIndex = 1;
+ ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
+ doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex);
+ doReturn(6L).when(replicatedLogEntry).getTerm();
+ doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex();
+
+ snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+ new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
+
+ snapshotManager.persist(new byte[]{}, 2000000L);
+
+ verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
+
+ verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
+
+ verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
}
@Test
byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
- snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(bytes, Runtime.getRuntime().totalMemory());
+
+ assertEquals(true, snapshotManager.isCapturing());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
- assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+ assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
}
@Test
public void testCallingPersistWithoutCaptureWillDoNothing(){
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ assertEquals(true, snapshotManager.isCapturing());
+
+ snapshotManager.commit(100L, 1234L);
+
+ assertEquals(false, snapshotManager.isCapturing());
verify(mockReplicatedLog).snapshotCommit();
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
- assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
- // config snapShotBatchCount = 10
- // therefore maxSequenceNumber = 90
+ assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
+ assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
+
+ MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
@Test
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, times(1)).snapshotCommit();
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
verify(mockReplicatedLog).snapshotRollback();
+
+ MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory());
+ snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
snapshotManager.rollback();
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", 10L, retIndex);
verify(mockReplicatedLog).snapshotPreCommit(10, 5);
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(false).when(mockReplicatedLog).isPresent(10);
- long retIndex = snapshotManager.trimLog(10, mockRaftActorBehavior);
+ long retIndex = snapshotManager.trimLog(10);
assertEquals("return index", -1L, retIndex);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
verify(mockReplicatedLog, never()).snapshotCommit();
doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
doReturn(5L).when(replicatedLogEntry).getTerm();
- snapshotManager.trimLog(10, mockRaftActorBehavior);
+ snapshotManager.trimLog(10);
verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
verify(mockReplicatedLog, never()).snapshotCommit();
assertEquals("getTerm", -1L, reader.getTerm());
assertEquals("getIndex", -1L, reader.getIndex());
}
-}
\ No newline at end of file
+}