1 package org.opendaylight.controller.cluster.raft;
3 import static junit.framework.TestCase.assertTrue;
4 import static org.junit.Assert.assertEquals;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Mockito.doReturn;
8 import static org.mockito.Mockito.mock;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorRef;
13 import akka.japi.Procedure;
14 import akka.persistence.SnapshotSelectionCriteria;
15 import akka.testkit.TestActorRef;
16 import java.util.Arrays;
17 import java.util.HashMap;
18 import java.util.List;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.ArgumentCaptor;
23 import org.mockito.Mock;
24 import org.mockito.Mockito;
25 import org.mockito.MockitoAnnotations;
26 import org.opendaylight.controller.cluster.DataPersistenceProvider;
27 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
31 import org.slf4j.LoggerFactory;
33 public class SnapshotManagerTest extends AbstractActorTest {
36 private RaftActorContext mockRaftActorContext;
39 private ConfigParams mockConfigParams;
42 private ReplicatedLog mockReplicatedLog;
45 private DataPersistenceProvider mockDataPersistenceProvider;
48 private RaftActorBehavior mockRaftActorBehavior;
51 private Procedure<Void> mockProcedure;
53 private SnapshotManager snapshotManager;
55 private TestActorFactory factory;
57 private TestActorRef<MessageCollectorActor> actorRef;
61 MockitoAnnotations.initMocks(this);
63 doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses();
64 doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
65 doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
66 doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
67 doReturn("123").when(mockRaftActorContext).getId();
68 doReturn("123").when(mockRaftActorBehavior).getLeaderId();
70 snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
71 factory = new TestActorFactory(getSystem());
73 actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
74 doReturn(actorRef).when(mockRaftActorContext).getActor();
79 public void tearDown(){
84 public void testConstruction(){
85 assertEquals(false, snapshotManager.isCapturing());
89 public void testCaptureToInstall(){
91 // Force capturing toInstall = true
92 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
93 new MockRaftActorContext.MockPayload()), 0);
95 assertEquals(true, snapshotManager.isCapturing());
97 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
99 // LastIndex and LastTerm are picked up from the lastLogEntry
100 assertEquals(0L, captureSnapshot.getLastIndex());
101 assertEquals(1L, captureSnapshot.getLastTerm());
103 // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
104 assertEquals(0L, captureSnapshot.getLastAppliedIndex());
105 assertEquals(1L, captureSnapshot.getLastAppliedTerm());
108 assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
109 assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
110 actorRef.underlyingActor().clear();
114 public void testCapture(){
115 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
116 new MockRaftActorContext.MockPayload()), 9);
118 assertEquals(true, snapshotManager.isCapturing());
120 CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
121 // LastIndex and LastTerm are picked up from the lastLogEntry
122 assertEquals(9L, captureSnapshot.getLastIndex());
123 assertEquals(1L, captureSnapshot.getLastTerm());
125 // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
126 assertEquals(9L, captureSnapshot.getLastAppliedIndex());
127 assertEquals(1L, captureSnapshot.getLastAppliedTerm());
130 assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
131 assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
133 actorRef.underlyingActor().clear();
138 public void testIllegalCapture() throws Exception {
139 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
140 new MockRaftActorContext.MockPayload()), 9);
142 List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
144 assertEquals(1, allMatching.size());
146 // This will not cause snapshot capture to start again
147 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
148 new MockRaftActorContext.MockPayload()), 9);
150 allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
152 assertEquals(1, allMatching.size());
156 public void testPersistWhenReplicatedToAllIndexMinusOne(){
157 doReturn("123").when(mockRaftActorContext).getId();
158 doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
159 doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
161 // when replicatedToAllIndex = -1
162 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
163 new MockRaftActorContext.MockPayload()), -1);
165 snapshotManager.create(mockProcedure);
167 byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
168 snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
170 ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
171 verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
173 Snapshot snapshot = snapshotArgumentCaptor.getValue();
175 assertEquals(6, snapshot.getLastAppliedTerm());
176 assertEquals(9, snapshot.getLastAppliedIndex());
177 assertEquals(9, snapshot.getLastIndex());
178 assertEquals(6, snapshot.getLastTerm());
179 assertEquals(10, snapshot.getState().length);
180 assertTrue(Arrays.equals(bytes, snapshot.getState()));
181 assertEquals(0, snapshot.getUnAppliedEntries().size());
183 verify(mockReplicatedLog).snapshotPreCommit(45L, 6L);
188 public void testCreate() throws Exception {
189 // when replicatedToAllIndex = -1
190 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
191 new MockRaftActorContext.MockPayload()), -1);
193 snapshotManager.create(mockProcedure);
195 verify(mockProcedure).apply(null);
199 public void testCallingCreateMultipleTimesCausesNoHarm() throws Exception {
200 // when replicatedToAllIndex = -1
201 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
202 new MockRaftActorContext.MockPayload()), -1);
204 snapshotManager.create(mockProcedure);
206 snapshotManager.create(mockProcedure);
208 verify(mockProcedure, times(1)).apply(null);
212 public void testCallingCreateBeforeCapture() throws Exception {
213 snapshotManager.create(mockProcedure);
215 verify(mockProcedure, times(0)).apply(null);
219 public void testCallingCreateAfterPersist() throws Exception {
220 // when replicatedToAllIndex = -1
221 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
222 new MockRaftActorContext.MockPayload()), -1);
224 snapshotManager.create(mockProcedure);
226 verify(mockProcedure, times(1)).apply(null);
228 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
230 Mockito.reset(mockProcedure);
232 snapshotManager.create(mockProcedure);
234 verify(mockProcedure, never()).apply(null);
238 public void testPersistWhenReplicatedToAllIndexNotMinus(){
239 doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
240 doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
241 ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
242 doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
243 doReturn(6L).when(replicatedLogEntry).getTerm();
244 doReturn(9L).when(replicatedLogEntry).getIndex();
246 // when replicatedToAllIndex != -1
247 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
248 new MockRaftActorContext.MockPayload()), 9);
250 snapshotManager.create(mockProcedure);
252 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
254 verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
256 verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
258 verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
263 public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){
264 doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
266 // when replicatedToAllIndex = -1
267 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6,9,
268 new MockRaftActorContext.MockPayload()), -1);
270 snapshotManager.create(mockProcedure);
272 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
274 verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
276 verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
278 verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1);
282 public void testPersistSendInstallSnapshot(){
283 doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
285 // when replicatedToAllIndex = -1
286 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
287 new MockRaftActorContext.MockPayload()), -1);
289 snapshotManager.create(mockProcedure);
291 byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
293 snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
295 verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
297 verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
299 ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
300 = ArgumentCaptor.forClass(SendInstallSnapshot.class);
302 verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
304 SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
306 assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
310 public void testCallingPersistWithoutCaptureWillDoNothing(){
311 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
313 verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
315 verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
317 verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
320 public void testCallingPersistTwiceWillDoNoHarm(){
321 doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
323 // when replicatedToAllIndex = -1
324 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
325 new MockRaftActorContext.MockPayload()), -1);
327 snapshotManager.create(mockProcedure);
329 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
331 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
333 verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
335 verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
337 verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
341 public void testCommit(){
342 // when replicatedToAllIndex = -1
343 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
344 new MockRaftActorContext.MockPayload()), -1);
346 snapshotManager.create(mockProcedure);
348 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
350 snapshotManager.commit(mockDataPersistenceProvider, 100L);
352 verify(mockReplicatedLog).snapshotCommit();
354 verify(mockDataPersistenceProvider).deleteMessages(100L);
356 ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
358 verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
360 assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
361 // config snapShotBatchCount = 10
362 // therefore maxSequenceNumber = 90
366 public void testCommitBeforePersist(){
367 // when replicatedToAllIndex = -1
368 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
369 new MockRaftActorContext.MockPayload()), -1);
371 snapshotManager.commit(mockDataPersistenceProvider, 100L);
373 verify(mockReplicatedLog, never()).snapshotCommit();
375 verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
377 verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
382 public void testCommitBeforeCapture(){
383 snapshotManager.commit(mockDataPersistenceProvider, 100L);
385 verify(mockReplicatedLog, never()).snapshotCommit();
387 verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
389 verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
394 public void testCallingCommitMultipleTimesCausesNoHarm(){
395 // when replicatedToAllIndex = -1
396 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
397 new MockRaftActorContext.MockPayload()), -1);
399 snapshotManager.create(mockProcedure);
401 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
403 snapshotManager.commit(mockDataPersistenceProvider, 100L);
405 snapshotManager.commit(mockDataPersistenceProvider, 100L);
407 verify(mockReplicatedLog, times(1)).snapshotCommit();
409 verify(mockDataPersistenceProvider, times(1)).deleteMessages(100L);
411 verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
415 public void testRollback(){
416 // when replicatedToAllIndex = -1
417 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
418 new MockRaftActorContext.MockPayload()), -1);
420 snapshotManager.create(mockProcedure);
422 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
424 snapshotManager.rollback();
426 verify(mockReplicatedLog).snapshotRollback();
431 public void testRollbackBeforePersist(){
432 // when replicatedToAllIndex = -1
433 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
434 new MockRaftActorContext.MockPayload()), -1);
436 snapshotManager.rollback();
438 verify(mockReplicatedLog, never()).snapshotRollback();
442 public void testRollbackBeforeCapture(){
443 snapshotManager.rollback();
445 verify(mockReplicatedLog, never()).snapshotRollback();
449 public void testCallingRollbackMultipleTimesCausesNoHarm(){
450 // when replicatedToAllIndex = -1
451 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
452 new MockRaftActorContext.MockPayload()), -1);
454 snapshotManager.create(mockProcedure);
456 snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
458 snapshotManager.rollback();
460 snapshotManager.rollback();
462 verify(mockReplicatedLog, times(1)).snapshotRollback();
466 public void testTrimLog(){
467 ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
468 ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
469 doReturn(20L).when(mockRaftActorContext).getLastApplied();
470 doReturn(true).when(mockReplicatedLog).isPresent(10);
471 doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
472 doReturn(5L).when(mockElectionTerm).getCurrentTerm();
473 doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
474 doReturn(5L).when(replicatedLogEntry).getTerm();
476 snapshotManager.trimLog(10);
478 verify(mockReplicatedLog).snapshotPreCommit(10, 5);
479 verify(mockReplicatedLog).snapshotCommit();
483 public void testTrimLogAfterCapture(){
484 snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
485 new MockRaftActorContext.MockPayload()), 9);
487 assertEquals(true, snapshotManager.isCapturing());
489 ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
490 ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
491 doReturn(20L).when(mockRaftActorContext).getLastApplied();
492 doReturn(true).when(mockReplicatedLog).isPresent(10);
493 doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
494 doReturn(5L).when(mockElectionTerm).getCurrentTerm();
495 doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
496 doReturn(5L).when(replicatedLogEntry).getTerm();
498 snapshotManager.trimLog(10);
500 verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
501 verify(mockReplicatedLog, never()).snapshotCommit();
506 public void testTrimLogAfterCaptureToInstall(){
507 snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
508 new MockRaftActorContext.MockPayload()), 9);
510 assertEquals(true, snapshotManager.isCapturing());
512 ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
513 ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
514 doReturn(20L).when(mockRaftActorContext).getLastApplied();
515 doReturn(true).when(mockReplicatedLog).isPresent(10);
516 doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
517 doReturn(5L).when(mockElectionTerm).getCurrentTerm();
518 doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
519 doReturn(5L).when(replicatedLogEntry).getTerm();
521 snapshotManager.trimLog(10);
523 verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
524 verify(mockReplicatedLog, never()).snapshotCommit();