Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / SnapshotManagerTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertFalse;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyLong;
17 import static org.mockito.Matchers.anyObject;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26
27 import akka.actor.ActorRef;
28 import akka.persistence.SnapshotSelectionCriteria;
29 import akka.testkit.TestActorRef;
30 import java.io.OutputStream;
31 import java.util.Arrays;
32 import java.util.Optional;
33 import java.util.function.Consumer;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.ArgumentCaptor;
38 import org.mockito.Mock;
39 import org.mockito.MockitoAnnotations;
40 import org.opendaylight.controller.cluster.DataPersistenceProvider;
41 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
42 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
43 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
46 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
47 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
48 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
49 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
50 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
51 import org.slf4j.LoggerFactory;
52
53 public class SnapshotManagerTest extends AbstractActorTest {
54
55     @Mock
56     private RaftActorContext mockRaftActorContext;
57
58     @Mock
59     private ConfigParams mockConfigParams;
60
61     @Mock
62     private ReplicatedLog mockReplicatedLog;
63
64     @Mock
65     private DataPersistenceProvider mockDataPersistenceProvider;
66
67     @Mock
68     private RaftActorBehavior mockRaftActorBehavior;
69
70     @Mock
71     private Consumer<Optional<OutputStream>> mockProcedure;
72
73     @Mock
74     private ElectionTerm mockElectionTerm;
75
76     private SnapshotManager snapshotManager;
77
78     private TestActorFactory factory;
79
80     private TestActorRef<MessageCollectorActor> actorRef;
81
82     @Before
83     public void setUp() {
84         MockitoAnnotations.initMocks(this);
85
86         doReturn(false).when(mockRaftActorContext).hasFollowers();
87         doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
88         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
89         doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
90         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
91         doReturn("123").when(mockRaftActorContext).getId();
92         doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
93         doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
94         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
95
96         doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
97         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
98         doReturn("member5").when(mockElectionTerm).getVotedFor();
99
100         doReturn(new FileBackedOutputStream(10000000, "target")).when(mockRaftActorContext).newFileBackedOutputStream();
101
102         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
103         factory = new TestActorFactory(getSystem());
104
105         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
106         doReturn(actorRef).when(mockRaftActorContext).getActor();
107
108         snapshotManager.setCreateSnapshotConsumer(mockProcedure);
109     }
110
111     @After
112     public void tearDown() {
113         factory.close();
114     }
115
116     @Test
117     public void testConstruction() {
118         assertEquals(false, snapshotManager.isCapturing());
119     }
120
121     @SuppressWarnings({ "unchecked", "rawtypes" })
122     @Test
123     public void testCaptureToInstall() throws Exception {
124
125         // Force capturing toInstall = true
126         snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1,
127                 new MockRaftActorContext.MockPayload()), 0, "follower-1");
128
129         assertEquals(true, snapshotManager.isCapturing());
130
131         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
132         verify(mockProcedure).accept(outputStream.capture());
133         assertEquals("isPresent", true, outputStream.getValue().isPresent());
134
135         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
136
137         // LastIndex and LastTerm are picked up from the lastLogEntry
138         assertEquals(0L, captureSnapshot.getLastIndex());
139         assertEquals(1L, captureSnapshot.getLastTerm());
140
141         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
142         assertEquals(0L, captureSnapshot.getLastAppliedIndex());
143         assertEquals(1L, captureSnapshot.getLastAppliedTerm());
144
145         //
146         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
147         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
148         actorRef.underlyingActor().clear();
149     }
150
151     @SuppressWarnings({ "rawtypes", "unchecked" })
152     @Test
153     public void testCapture() throws Exception {
154         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
155                 new MockRaftActorContext.MockPayload()), 9);
156
157         assertTrue(capture);
158
159         assertEquals(true, snapshotManager.isCapturing());
160
161         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
162         verify(mockProcedure).accept(outputStream.capture());
163         assertEquals("isPresent", false, outputStream.getValue().isPresent());
164
165         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
166
167         // LastIndex and LastTerm are picked up from the lastLogEntry
168         assertEquals(9L, captureSnapshot.getLastIndex());
169         assertEquals(1L, captureSnapshot.getLastTerm());
170
171         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
172         assertEquals(9L, captureSnapshot.getLastAppliedIndex());
173         assertEquals(1L, captureSnapshot.getLastAppliedTerm());
174
175         //
176         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
177         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
178
179         actorRef.underlyingActor().clear();
180
181     }
182
183     @SuppressWarnings({ "unchecked", "rawtypes" })
184     @Test
185     public void testCaptureWithNullLastLogEntry() throws Exception {
186         boolean capture = snapshotManager.capture(null, 1);
187
188         assertTrue(capture);
189
190         assertEquals(true, snapshotManager.isCapturing());
191
192         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
193         verify(mockProcedure).accept(outputStream.capture());
194         assertEquals("isPresent", false, outputStream.getValue().isPresent());
195
196         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
197
198         // LastIndex and LastTerm are picked up from the lastLogEntry
199         assertEquals(-1L, captureSnapshot.getLastIndex());
200         assertEquals(-1L, captureSnapshot.getLastTerm());
201
202         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
203         assertEquals(-1L, captureSnapshot.getLastAppliedIndex());
204         assertEquals(-1L, captureSnapshot.getLastAppliedTerm());
205
206         //
207         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
208         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
209         actorRef.underlyingActor().clear();
210
211     }
212
213     @Test
214     public void testCaptureWithCreateProcedureError() throws Exception {
215         doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject());
216
217         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
218                 new MockRaftActorContext.MockPayload()), 9);
219
220         assertFalse(capture);
221
222         assertEquals(false, snapshotManager.isCapturing());
223
224         verify(mockProcedure).accept(anyObject());
225     }
226
227     @SuppressWarnings("unchecked")
228     @Test
229     public void testIllegalCapture() throws Exception {
230         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
231                 new MockRaftActorContext.MockPayload()), 9);
232
233         assertTrue(capture);
234
235         verify(mockProcedure).accept(anyObject());
236
237         reset(mockProcedure);
238
239         // This will not cause snapshot capture to start again
240         capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
241                 new MockRaftActorContext.MockPayload()), 9);
242
243         assertFalse(capture);
244
245         verify(mockProcedure, never()).accept(anyObject());
246     }
247
248     @Test
249     public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception {
250         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
251         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
252
253         doReturn(true).when(mockRaftActorContext).hasFollowers();
254
255         doReturn(8L).when(mockRaftActorContext).getLastApplied();
256
257         ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 3L, new MockRaftActorContext.MockPayload());
258
259         ReplicatedLogEntry lastAppliedEntry = new SimpleReplicatedLogEntry(
260                 8L, 2L, new MockRaftActorContext.MockPayload());
261
262         doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L);
263         doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L);
264
265         // when replicatedToAllIndex = -1
266         snapshotManager.capture(lastLogEntry, -1);
267
268         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
269         snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
270
271         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
272         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
273
274         Snapshot snapshot = snapshotArgumentCaptor.getValue();
275
276         assertEquals("getLastTerm", 3L, snapshot.getLastTerm());
277         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
278         assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
279         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
280         assertEquals("getState", snapshotState, snapshot.getState());
281         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
282         assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
283         assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
284
285         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
286     }
287
288     @Test
289     public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception {
290         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
291         doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
292         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
293         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
294         doReturn(6L).when(replicatedLogEntry).getTerm();
295         doReturn(9L).when(replicatedLogEntry).getIndex();
296
297         // when replicatedToAllIndex != -1
298         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
299
300         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
301         snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
302
303         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
304         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
305
306         Snapshot snapshot = snapshotArgumentCaptor.getValue();
307
308         assertEquals("getLastTerm", 6L, snapshot.getLastTerm());
309         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
310         assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
311         assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
312         assertEquals("getState", snapshotState, snapshot.getState());
313         assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
314
315         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
316
317         verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
318     }
319
320     @Test
321     public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() {
322         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
323
324         // when replicatedToAllIndex = -1
325         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
326
327         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
328
329         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
330
331         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
332
333         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
334     }
335
336     @Test
337     public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() {
338         doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount
339         doReturn(100).when(mockReplicatedLog).dataSize();
340
341         doReturn(5L).when(mockReplicatedLog).getSnapshotIndex();
342         doReturn(5L).when(mockReplicatedLog).getSnapshotTerm();
343
344         long replicatedToAllIndex = 1;
345         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
346         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex);
347         doReturn(6L).when(replicatedLogEntry).getTerm();
348         doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex();
349
350         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
351                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
352
353         snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
354
355         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
356
357         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
358
359         verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
360     }
361
362     @SuppressWarnings({ "rawtypes", "unchecked" })
363     @Test
364     public void testPersistSendInstallSnapshot() throws Exception {
365         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
366         doNothing().when(mockProcedure).accept(anyObject());
367
368         // when replicatedToAllIndex = -1
369         boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
370                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
371
372         assertTrue(capture);
373
374         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
375
376         ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
377         verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
378
379         Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
380         assertEquals("isPresent", true, installSnapshotStream.isPresent());
381
382         installSnapshotStream.get().write(snapshotState.getBytes());
383
384         snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
385
386         assertEquals(true, snapshotManager.isCapturing());
387
388         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
389
390         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
391
392         ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
393                 = ArgumentCaptor.forClass(SendInstallSnapshot.class);
394
395         verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
396
397         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
398
399         assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
400         assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
401     }
402
403     @Test
404     public void testCallingPersistWithoutCaptureWillDoNothing() {
405         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
406
407         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
408
409         verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
410
411         verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
412     }
413
414     @Test
415     public void testCallingPersistTwiceWillDoNoHarm() {
416         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
417
418         // when replicatedToAllIndex = -1
419         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
420
421         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
422
423         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
424
425         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
426
427         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
428     }
429
430     @Test
431     public void testCommit() {
432         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
433
434         // when replicatedToAllIndex = -1
435         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
436
437         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
438
439         assertEquals(true, snapshotManager.isCapturing());
440
441         snapshotManager.commit(100L, 1234L);
442
443         assertEquals(false, snapshotManager.isCapturing());
444
445         verify(mockReplicatedLog).snapshotCommit();
446
447         verify(mockDataPersistenceProvider).deleteMessages(50L);
448
449         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor =
450                 ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
451
452         verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
453
454         assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
455         assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
456
457         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
458     }
459
460     @Test
461     public void testCommitBeforePersist() {
462         // when replicatedToAllIndex = -1
463         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
464
465         snapshotManager.commit(100L, 0);
466
467         verify(mockReplicatedLog, never()).snapshotCommit();
468
469         verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
470
471         verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
472
473     }
474
475     @Test
476     public void testCommitBeforeCapture() {
477         snapshotManager.commit(100L, 0);
478
479         verify(mockReplicatedLog, never()).snapshotCommit();
480
481         verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
482
483         verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
484
485     }
486
487     @Test
488     public void testCallingCommitMultipleTimesCausesNoHarm() {
489         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
490
491         // when replicatedToAllIndex = -1
492         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
493
494         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
495
496         snapshotManager.commit(100L, 0);
497
498         snapshotManager.commit(100L, 0);
499
500         verify(mockReplicatedLog, times(1)).snapshotCommit();
501
502         verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
503
504         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
505     }
506
507     @Test
508     public void testRollback() {
509         // when replicatedToAllIndex = -1
510         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
511
512         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
513
514         snapshotManager.rollback();
515
516         verify(mockReplicatedLog).snapshotRollback();
517
518         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
519     }
520
521
522     @Test
523     public void testRollbackBeforePersist() {
524         // when replicatedToAllIndex = -1
525         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
526
527         snapshotManager.rollback();
528
529         verify(mockReplicatedLog, never()).snapshotRollback();
530     }
531
532     @Test
533     public void testRollbackBeforeCapture() {
534         snapshotManager.rollback();
535
536         verify(mockReplicatedLog, never()).snapshotRollback();
537     }
538
539     @Test
540     public void testCallingRollbackMultipleTimesCausesNoHarm() {
541         // when replicatedToAllIndex = -1
542         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
543
544         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
545
546         snapshotManager.rollback();
547
548         snapshotManager.rollback();
549
550         verify(mockReplicatedLog, times(1)).snapshotRollback();
551     }
552
553     @Test
554     public void testTrimLogWhenTrimIndexLessThanLastApplied() {
555         doReturn(20L).when(mockRaftActorContext).getLastApplied();
556
557         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
558         doReturn(true).when(mockReplicatedLog).isPresent(10);
559         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
560         doReturn(5L).when(replicatedLogEntry).getTerm();
561
562         long retIndex = snapshotManager.trimLog(10);
563         assertEquals("return index", 10L, retIndex);
564
565         verify(mockReplicatedLog).snapshotPreCommit(10, 5);
566         verify(mockReplicatedLog).snapshotCommit();
567
568         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
569     }
570
571     @Test
572     public void testTrimLogWhenLastAppliedNotSet() {
573         doReturn(-1L).when(mockRaftActorContext).getLastApplied();
574
575         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
576         doReturn(true).when(mockReplicatedLog).isPresent(10);
577         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
578         doReturn(5L).when(replicatedLogEntry).getTerm();
579
580         long retIndex = snapshotManager.trimLog(10);
581         assertEquals("return index", -1L, retIndex);
582
583         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
584         verify(mockReplicatedLog, never()).snapshotCommit();
585
586         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
587     }
588
589     @Test
590     public void testTrimLogWhenLastAppliedZero() {
591         doReturn(0L).when(mockRaftActorContext).getLastApplied();
592
593         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
594         doReturn(true).when(mockReplicatedLog).isPresent(10);
595         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
596         doReturn(5L).when(replicatedLogEntry).getTerm();
597
598         long retIndex = snapshotManager.trimLog(10);
599         assertEquals("return index", -1L, retIndex);
600
601         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
602         verify(mockReplicatedLog, never()).snapshotCommit();
603
604         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
605     }
606
607     @Test
608     public void testTrimLogWhenTrimIndexNotPresent() {
609         doReturn(20L).when(mockRaftActorContext).getLastApplied();
610
611         doReturn(false).when(mockReplicatedLog).isPresent(10);
612
613         long retIndex = snapshotManager.trimLog(10);
614         assertEquals("return index", -1L, retIndex);
615
616         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
617         verify(mockReplicatedLog, never()).snapshotCommit();
618
619         // Trim index is greater than replicatedToAllIndex so should update it.
620         verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L);
621     }
622
623     @Test
624     public void testTrimLogAfterCapture() {
625         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
626                 new MockRaftActorContext.MockPayload()), 9);
627
628         assertTrue(capture);
629
630         assertEquals(true, snapshotManager.isCapturing());
631
632         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
633         doReturn(20L).when(mockRaftActorContext).getLastApplied();
634         doReturn(true).when(mockReplicatedLog).isPresent(10);
635         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
636         doReturn(5L).when(replicatedLogEntry).getTerm();
637
638         snapshotManager.trimLog(10);
639
640         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
641         verify(mockReplicatedLog, never()).snapshotCommit();
642
643     }
644
645     @Test
646     public void testTrimLogAfterCaptureToInstall() {
647         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
648                 new MockRaftActorContext.MockPayload()), 9);
649
650         assertTrue(capture);
651
652         assertEquals(true, snapshotManager.isCapturing());
653
654         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
655         doReturn(20L).when(mockRaftActorContext).getLastApplied();
656         doReturn(true).when(mockReplicatedLog).isPresent(10);
657         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
658         doReturn(5L).when(replicatedLogEntry).getTerm();
659
660         snapshotManager.trimLog(10);
661
662         verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
663         verify(mockReplicatedLog, never()).snapshotCommit();
664
665     }
666
667     @Test
668     public void testLastAppliedTermInformationReader() {
669
670         LastAppliedTermInformationReader reader = new LastAppliedTermInformationReader();
671
672         doReturn(4L).when(mockReplicatedLog).getSnapshotTerm();
673         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
674
675         ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 6L,
676                 new MockRaftActorContext.MockPayload());
677
678         // No followers and valid lastLogEntry
679         reader.init(mockReplicatedLog, 1L, lastLogEntry, false);
680
681         assertEquals("getTerm", 6L, reader.getTerm());
682         assertEquals("getIndex", 9L, reader.getIndex());
683
684         // No followers and null lastLogEntry
685         reader.init(mockReplicatedLog, 1L, null, false);
686
687         assertEquals("getTerm", -1L, reader.getTerm());
688         assertEquals("getIndex", -1L, reader.getIndex());
689
690         // Followers and valid originalIndex entry
691         doReturn(new SimpleReplicatedLogEntry(8L, 5L,
692                 new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L);
693         reader.init(mockReplicatedLog, 8L, lastLogEntry, true);
694
695         assertEquals("getTerm", 5L, reader.getTerm());
696         assertEquals("getIndex", 8L, reader.getIndex());
697
698         // Followers and null originalIndex entry and valid snapshot index
699         reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
700
701         assertEquals("getTerm", 4L, reader.getTerm());
702         assertEquals("getIndex", 7L, reader.getIndex());
703
704         // Followers and null originalIndex entry and invalid snapshot index
705         doReturn(-1L).when(mockReplicatedLog).getSnapshotIndex();
706         reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
707
708         assertEquals("getTerm", -1L, reader.getTerm());
709         assertEquals("getIndex", -1L, reader.getIndex());
710     }
711 }