Bug 2890: Chunk AppendEntries when single payload size exceeds threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / SnapshotManagerTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertArrayEquals;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertFalse;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyLong;
17 import static org.mockito.Matchers.anyObject;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26
27 import akka.actor.ActorRef;
28 import akka.persistence.SnapshotSelectionCriteria;
29 import akka.testkit.TestActorRef;
30 import java.io.OutputStream;
31 import java.util.Arrays;
32 import java.util.Optional;
33 import java.util.function.Consumer;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.ArgumentCaptor;
38 import org.mockito.Mock;
39 import org.mockito.MockitoAnnotations;
40 import org.opendaylight.controller.cluster.DataPersistenceProvider;
41 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
42 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
43 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
44 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
45 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
46 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
47 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
48 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
49 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
50 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
51 import org.slf4j.LoggerFactory;
52
53 public class SnapshotManagerTest extends AbstractActorTest {
54
55     @Mock
56     private RaftActorContext mockRaftActorContext;
57
58     @Mock
59     private ConfigParams mockConfigParams;
60
61     @Mock
62     private ReplicatedLog mockReplicatedLog;
63
64     @Mock
65     private DataPersistenceProvider mockDataPersistenceProvider;
66
67     @Mock
68     private RaftActorBehavior mockRaftActorBehavior;
69
70     @Mock
71     private Consumer<Optional<OutputStream>> mockProcedure;
72
73     @Mock
74     private ElectionTerm mockElectionTerm;
75
76     private SnapshotManager snapshotManager;
77
78     private TestActorFactory factory;
79
80     private TestActorRef<MessageCollectorActor> actorRef;
81
82     @Before
83     public void setUp() {
84         MockitoAnnotations.initMocks(this);
85
86         doReturn(false).when(mockRaftActorContext).hasFollowers();
87         doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams();
88         doReturn(10L).when(mockConfigParams).getSnapshotBatchCount();
89         doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage();
90         doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog();
91         doReturn("123").when(mockRaftActorContext).getId();
92         doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider();
93         doReturn(mockRaftActorBehavior).when(mockRaftActorContext).getCurrentBehavior();
94         doReturn("123").when(mockRaftActorBehavior).getLeaderId();
95
96         doReturn(mockElectionTerm).when(mockRaftActorContext).getTermInformation();
97         doReturn(5L).when(mockElectionTerm).getCurrentTerm();
98         doReturn("member5").when(mockElectionTerm).getVotedFor();
99
100         doReturn(new FileBackedOutputStreamFactory(10000000, "target"))
101                 .when(mockRaftActorContext).getFileBackedOutputStreamFactory();
102
103         snapshotManager = new SnapshotManager(mockRaftActorContext, LoggerFactory.getLogger(this.getClass()));
104         factory = new TestActorFactory(getSystem());
105
106         actorRef = factory.createTestActor(MessageCollectorActor.props(), factory.generateActorId("test-"));
107         doReturn(actorRef).when(mockRaftActorContext).getActor();
108
109         snapshotManager.setCreateSnapshotConsumer(mockProcedure);
110     }
111
112     @After
113     public void tearDown() {
114         factory.close();
115     }
116
117     @Test
118     public void testConstruction() {
119         assertEquals(false, snapshotManager.isCapturing());
120     }
121
122     @SuppressWarnings({ "unchecked", "rawtypes" })
123     @Test
124     public void testCaptureToInstall() throws Exception {
125
126         // Force capturing toInstall = true
127         snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(0, 1,
128                 new MockRaftActorContext.MockPayload()), 0, "follower-1");
129
130         assertEquals(true, snapshotManager.isCapturing());
131
132         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
133         verify(mockProcedure).accept(outputStream.capture());
134         assertEquals("isPresent", true, outputStream.getValue().isPresent());
135
136         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
137
138         // LastIndex and LastTerm are picked up from the lastLogEntry
139         assertEquals(0L, captureSnapshot.getLastIndex());
140         assertEquals(1L, captureSnapshot.getLastTerm());
141
142         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
143         assertEquals(0L, captureSnapshot.getLastAppliedIndex());
144         assertEquals(1L, captureSnapshot.getLastAppliedTerm());
145
146         //
147         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
148         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
149         actorRef.underlyingActor().clear();
150     }
151
152     @SuppressWarnings({ "rawtypes", "unchecked" })
153     @Test
154     public void testCapture() throws Exception {
155         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
156                 new MockRaftActorContext.MockPayload()), 9);
157
158         assertTrue(capture);
159
160         assertEquals(true, snapshotManager.isCapturing());
161
162         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
163         verify(mockProcedure).accept(outputStream.capture());
164         assertEquals("isPresent", false, outputStream.getValue().isPresent());
165
166         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
167
168         // LastIndex and LastTerm are picked up from the lastLogEntry
169         assertEquals(9L, captureSnapshot.getLastIndex());
170         assertEquals(1L, captureSnapshot.getLastTerm());
171
172         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
173         assertEquals(9L, captureSnapshot.getLastAppliedIndex());
174         assertEquals(1L, captureSnapshot.getLastAppliedTerm());
175
176         //
177         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
178         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
179
180         actorRef.underlyingActor().clear();
181
182     }
183
184     @SuppressWarnings({ "unchecked", "rawtypes" })
185     @Test
186     public void testCaptureWithNullLastLogEntry() throws Exception {
187         boolean capture = snapshotManager.capture(null, 1);
188
189         assertTrue(capture);
190
191         assertEquals(true, snapshotManager.isCapturing());
192
193         ArgumentCaptor<Optional> outputStream = ArgumentCaptor.forClass(Optional.class);
194         verify(mockProcedure).accept(outputStream.capture());
195         assertEquals("isPresent", false, outputStream.getValue().isPresent());
196
197         CaptureSnapshot captureSnapshot = snapshotManager.getCaptureSnapshot();
198
199         // LastIndex and LastTerm are picked up from the lastLogEntry
200         assertEquals(-1L, captureSnapshot.getLastIndex());
201         assertEquals(-1L, captureSnapshot.getLastTerm());
202
203         // Since the actor does not have any followers (no peer addresses) lastApplied will be from lastLogEntry
204         assertEquals(-1L, captureSnapshot.getLastAppliedIndex());
205         assertEquals(-1L, captureSnapshot.getLastAppliedTerm());
206
207         //
208         assertEquals(-1L, captureSnapshot.getReplicatedToAllIndex());
209         assertEquals(-1L, captureSnapshot.getReplicatedToAllTerm());
210         actorRef.underlyingActor().clear();
211
212     }
213
214     @Test
215     public void testCaptureWithCreateProcedureError() throws Exception {
216         doThrow(new RuntimeException("mock")).when(mockProcedure).accept(anyObject());
217
218         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
219                 new MockRaftActorContext.MockPayload()), 9);
220
221         assertFalse(capture);
222
223         assertEquals(false, snapshotManager.isCapturing());
224
225         verify(mockProcedure).accept(anyObject());
226     }
227
228     @SuppressWarnings("unchecked")
229     @Test
230     public void testIllegalCapture() throws Exception {
231         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
232                 new MockRaftActorContext.MockPayload()), 9);
233
234         assertTrue(capture);
235
236         verify(mockProcedure).accept(anyObject());
237
238         reset(mockProcedure);
239
240         // This will not cause snapshot capture to start again
241         capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
242                 new MockRaftActorContext.MockPayload()), 9);
243
244         assertFalse(capture);
245
246         verify(mockProcedure, never()).accept(anyObject());
247     }
248
249     @Test
250     public void testPersistWhenReplicatedToAllIndexMinusOne() throws Exception {
251         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
252         doReturn(1L).when(mockReplicatedLog).getSnapshotTerm();
253
254         doReturn(true).when(mockRaftActorContext).hasFollowers();
255
256         doReturn(8L).when(mockRaftActorContext).getLastApplied();
257
258         ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 3L, new MockRaftActorContext.MockPayload());
259
260         ReplicatedLogEntry lastAppliedEntry = new SimpleReplicatedLogEntry(
261                 8L, 2L, new MockRaftActorContext.MockPayload());
262
263         doReturn(lastAppliedEntry).when(mockReplicatedLog).get(8L);
264         doReturn(Arrays.asList(lastLogEntry)).when(mockReplicatedLog).getFrom(9L);
265
266         // when replicatedToAllIndex = -1
267         snapshotManager.capture(lastLogEntry, -1);
268
269         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
270         snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
271
272         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
273         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
274
275         Snapshot snapshot = snapshotArgumentCaptor.getValue();
276
277         assertEquals("getLastTerm", 3L, snapshot.getLastTerm());
278         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
279         assertEquals("getLastAppliedTerm", 2L, snapshot.getLastAppliedTerm());
280         assertEquals("getLastAppliedIndex", 8L, snapshot.getLastAppliedIndex());
281         assertEquals("getState", snapshotState, snapshot.getState());
282         assertEquals("getUnAppliedEntries", Arrays.asList(lastLogEntry), snapshot.getUnAppliedEntries());
283         assertEquals("electionTerm", mockElectionTerm.getCurrentTerm(), snapshot.getElectionTerm());
284         assertEquals("electionVotedFor", mockElectionTerm.getVotedFor(), snapshot.getElectionVotedFor());
285
286         verify(mockReplicatedLog).snapshotPreCommit(7L, 1L);
287     }
288
289     @Test
290     public void testPersistWhenReplicatedToAllIndexNotMinus() throws Exception {
291         doReturn(45L).when(mockReplicatedLog).getSnapshotIndex();
292         doReturn(6L).when(mockReplicatedLog).getSnapshotTerm();
293         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
294         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(9);
295         doReturn(6L).when(replicatedLogEntry).getTerm();
296         doReturn(9L).when(replicatedLogEntry).getIndex();
297
298         // when replicatedToAllIndex != -1
299         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), 9);
300
301         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
302         snapshotManager.persist(snapshotState, Optional.empty(), Runtime.getRuntime().totalMemory());
303
304         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
305         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
306
307         Snapshot snapshot = snapshotArgumentCaptor.getValue();
308
309         assertEquals("getLastTerm", 6L, snapshot.getLastTerm());
310         assertEquals("getLastIndex", 9L, snapshot.getLastIndex());
311         assertEquals("getLastAppliedTerm", 6L, snapshot.getLastAppliedTerm());
312         assertEquals("getLastAppliedIndex", 9L, snapshot.getLastAppliedIndex());
313         assertEquals("getState", snapshotState, snapshot.getState());
314         assertEquals("getUnAppliedEntries size", 0, snapshot.getUnAppliedEntries().size());
315
316         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
317
318         verify(mockRaftActorBehavior).setReplicatedToAllIndex(9);
319     }
320
321     @Test
322     public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold() {
323         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
324
325         // when replicatedToAllIndex = -1
326         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
327
328         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
329
330         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
331
332         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
333
334         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
335     }
336
337     @Test
338     public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() {
339         doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount
340         doReturn(100).when(mockReplicatedLog).dataSize();
341
342         doReturn(5L).when(mockReplicatedLog).getSnapshotIndex();
343         doReturn(5L).when(mockReplicatedLog).getSnapshotTerm();
344
345         long replicatedToAllIndex = 1;
346         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
347         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex);
348         doReturn(6L).when(replicatedLogEntry).getTerm();
349         doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex();
350
351         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6,
352                 new MockRaftActorContext.MockPayload()), replicatedToAllIndex);
353
354         snapshotManager.persist(ByteState.empty(), Optional.empty(), 2000000L);
355
356         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
357
358         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
359
360         verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex);
361     }
362
363     @SuppressWarnings({ "rawtypes", "unchecked" })
364     @Test
365     public void testPersistSendInstallSnapshot() throws Exception {
366         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
367         doNothing().when(mockProcedure).accept(anyObject());
368
369         // when replicatedToAllIndex = -1
370         boolean capture = snapshotManager.captureToInstall(new SimpleReplicatedLogEntry(9, 6,
371                 new MockRaftActorContext.MockPayload()), -1, "follower-1");
372
373         assertTrue(capture);
374
375         ByteState snapshotState = ByteState.of(new byte[] {1,2,3,4,5,6,7,8,9,10});
376
377         ArgumentCaptor<Optional> installSnapshotStreamCapture = ArgumentCaptor.forClass(Optional.class);
378         verify(mockProcedure).accept(installSnapshotStreamCapture.capture());
379
380         Optional<OutputStream> installSnapshotStream = installSnapshotStreamCapture.getValue();
381         assertEquals("isPresent", true, installSnapshotStream.isPresent());
382
383         installSnapshotStream.get().write(snapshotState.getBytes());
384
385         snapshotManager.persist(snapshotState, installSnapshotStream, Runtime.getRuntime().totalMemory());
386
387         assertEquals(true, snapshotManager.isCapturing());
388
389         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
390
391         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
392
393         ArgumentCaptor<SendInstallSnapshot> sendInstallSnapshotArgumentCaptor
394                 = ArgumentCaptor.forClass(SendInstallSnapshot.class);
395
396         verify(mockRaftActorBehavior).handleMessage(any(ActorRef.class), sendInstallSnapshotArgumentCaptor.capture());
397
398         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
399
400         assertEquals("state", snapshotState, sendInstallSnapshot.getSnapshot().getState());
401         assertArrayEquals("state", snapshotState.getBytes(), sendInstallSnapshot.getSnapshotBytes().read());
402     }
403
404     @Test
405     public void testCallingPersistWithoutCaptureWillDoNothing() {
406         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
407
408         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
409
410         verify(mockReplicatedLog, never()).snapshotPreCommit(9L, 6L);
411
412         verify(mockRaftActorBehavior, never()).handleMessage(any(ActorRef.class), any(SendInstallSnapshot.class));
413     }
414
415     @Test
416     public void testCallingPersistTwiceWillDoNoHarm() {
417         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
418
419         // when replicatedToAllIndex = -1
420         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
421
422         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
423
424         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
425
426         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
427
428         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
429     }
430
431     @Test
432     public void testCommit() {
433         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
434
435         // when replicatedToAllIndex = -1
436         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
437
438         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
439
440         assertEquals(true, snapshotManager.isCapturing());
441
442         snapshotManager.commit(100L, 1234L);
443
444         assertEquals(false, snapshotManager.isCapturing());
445
446         verify(mockReplicatedLog).snapshotCommit();
447
448         verify(mockDataPersistenceProvider).deleteMessages(50L);
449
450         ArgumentCaptor<SnapshotSelectionCriteria> criteriaCaptor =
451                 ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
452
453         verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
454
455         assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
456         assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
457
458         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
459     }
460
461     @Test
462     public void testCommitBeforePersist() {
463         // when replicatedToAllIndex = -1
464         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
465
466         snapshotManager.commit(100L, 0);
467
468         verify(mockReplicatedLog, never()).snapshotCommit();
469
470         verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
471
472         verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
473
474     }
475
476     @Test
477     public void testCommitBeforeCapture() {
478         snapshotManager.commit(100L, 0);
479
480         verify(mockReplicatedLog, never()).snapshotCommit();
481
482         verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
483
484         verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
485
486     }
487
488     @Test
489     public void testCallingCommitMultipleTimesCausesNoHarm() {
490         doReturn(50L).when(mockDataPersistenceProvider).getLastSequenceNumber();
491
492         // when replicatedToAllIndex = -1
493         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
494
495         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
496
497         snapshotManager.commit(100L, 0);
498
499         snapshotManager.commit(100L, 0);
500
501         verify(mockReplicatedLog, times(1)).snapshotCommit();
502
503         verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
504
505         verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
506     }
507
508     @Test
509     public void testRollback() {
510         // when replicatedToAllIndex = -1
511         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
512
513         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
514
515         snapshotManager.rollback();
516
517         verify(mockReplicatedLog).snapshotRollback();
518
519         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
520     }
521
522
523     @Test
524     public void testRollbackBeforePersist() {
525         // when replicatedToAllIndex = -1
526         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
527
528         snapshotManager.rollback();
529
530         verify(mockReplicatedLog, never()).snapshotRollback();
531     }
532
533     @Test
534     public void testRollbackBeforeCapture() {
535         snapshotManager.rollback();
536
537         verify(mockReplicatedLog, never()).snapshotRollback();
538     }
539
540     @Test
541     public void testCallingRollbackMultipleTimesCausesNoHarm() {
542         // when replicatedToAllIndex = -1
543         snapshotManager.capture(new SimpleReplicatedLogEntry(9, 6, new MockRaftActorContext.MockPayload()), -1);
544
545         snapshotManager.persist(ByteState.empty(), Optional.empty(), Runtime.getRuntime().totalMemory());
546
547         snapshotManager.rollback();
548
549         snapshotManager.rollback();
550
551         verify(mockReplicatedLog, times(1)).snapshotRollback();
552     }
553
554     @Test
555     public void testTrimLogWhenTrimIndexLessThanLastApplied() {
556         doReturn(20L).when(mockRaftActorContext).getLastApplied();
557
558         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
559         doReturn(true).when(mockReplicatedLog).isPresent(10);
560         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
561         doReturn(5L).when(replicatedLogEntry).getTerm();
562
563         long retIndex = snapshotManager.trimLog(10);
564         assertEquals("return index", 10L, retIndex);
565
566         verify(mockReplicatedLog).snapshotPreCommit(10, 5);
567         verify(mockReplicatedLog).snapshotCommit();
568
569         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
570     }
571
572     @Test
573     public void testTrimLogWhenLastAppliedNotSet() {
574         doReturn(-1L).when(mockRaftActorContext).getLastApplied();
575
576         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
577         doReturn(true).when(mockReplicatedLog).isPresent(10);
578         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
579         doReturn(5L).when(replicatedLogEntry).getTerm();
580
581         long retIndex = snapshotManager.trimLog(10);
582         assertEquals("return index", -1L, retIndex);
583
584         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
585         verify(mockReplicatedLog, never()).snapshotCommit();
586
587         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
588     }
589
590     @Test
591     public void testTrimLogWhenLastAppliedZero() {
592         doReturn(0L).when(mockRaftActorContext).getLastApplied();
593
594         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
595         doReturn(true).when(mockReplicatedLog).isPresent(10);
596         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
597         doReturn(5L).when(replicatedLogEntry).getTerm();
598
599         long retIndex = snapshotManager.trimLog(10);
600         assertEquals("return index", -1L, retIndex);
601
602         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
603         verify(mockReplicatedLog, never()).snapshotCommit();
604
605         verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong());
606     }
607
608     @Test
609     public void testTrimLogWhenTrimIndexNotPresent() {
610         doReturn(20L).when(mockRaftActorContext).getLastApplied();
611
612         doReturn(false).when(mockReplicatedLog).isPresent(10);
613
614         long retIndex = snapshotManager.trimLog(10);
615         assertEquals("return index", -1L, retIndex);
616
617         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
618         verify(mockReplicatedLog, never()).snapshotCommit();
619
620         // Trim index is greater than replicatedToAllIndex so should update it.
621         verify(mockRaftActorBehavior).setReplicatedToAllIndex(10L);
622     }
623
624     @Test
625     public void testTrimLogAfterCapture() {
626         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
627                 new MockRaftActorContext.MockPayload()), 9);
628
629         assertTrue(capture);
630
631         assertEquals(true, snapshotManager.isCapturing());
632
633         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
634         doReturn(20L).when(mockRaftActorContext).getLastApplied();
635         doReturn(true).when(mockReplicatedLog).isPresent(10);
636         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
637         doReturn(5L).when(replicatedLogEntry).getTerm();
638
639         snapshotManager.trimLog(10);
640
641         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
642         verify(mockReplicatedLog, never()).snapshotCommit();
643
644     }
645
646     @Test
647     public void testTrimLogAfterCaptureToInstall() {
648         boolean capture = snapshotManager.capture(new SimpleReplicatedLogEntry(9, 1,
649                 new MockRaftActorContext.MockPayload()), 9);
650
651         assertTrue(capture);
652
653         assertEquals(true, snapshotManager.isCapturing());
654
655         ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class);
656         doReturn(20L).when(mockRaftActorContext).getLastApplied();
657         doReturn(true).when(mockReplicatedLog).isPresent(10);
658         doReturn(replicatedLogEntry).when(mockReplicatedLog).get(10);
659         doReturn(5L).when(replicatedLogEntry).getTerm();
660
661         snapshotManager.trimLog(10);
662
663         verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
664         verify(mockReplicatedLog, never()).snapshotCommit();
665
666     }
667
668     @Test
669     public void testLastAppliedTermInformationReader() {
670
671         LastAppliedTermInformationReader reader = new LastAppliedTermInformationReader();
672
673         doReturn(4L).when(mockReplicatedLog).getSnapshotTerm();
674         doReturn(7L).when(mockReplicatedLog).getSnapshotIndex();
675
676         ReplicatedLogEntry lastLogEntry = new SimpleReplicatedLogEntry(9L, 6L,
677                 new MockRaftActorContext.MockPayload());
678
679         // No followers and valid lastLogEntry
680         reader.init(mockReplicatedLog, 1L, lastLogEntry, false);
681
682         assertEquals("getTerm", 6L, reader.getTerm());
683         assertEquals("getIndex", 9L, reader.getIndex());
684
685         // No followers and null lastLogEntry
686         reader.init(mockReplicatedLog, 1L, null, false);
687
688         assertEquals("getTerm", -1L, reader.getTerm());
689         assertEquals("getIndex", -1L, reader.getIndex());
690
691         // Followers and valid originalIndex entry
692         doReturn(new SimpleReplicatedLogEntry(8L, 5L,
693                 new MockRaftActorContext.MockPayload())).when(mockReplicatedLog).get(8L);
694         reader.init(mockReplicatedLog, 8L, lastLogEntry, true);
695
696         assertEquals("getTerm", 5L, reader.getTerm());
697         assertEquals("getIndex", 8L, reader.getIndex());
698
699         // Followers and null originalIndex entry and valid snapshot index
700         reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
701
702         assertEquals("getTerm", 4L, reader.getTerm());
703         assertEquals("getIndex", 7L, reader.getIndex());
704
705         // Followers and null originalIndex entry and invalid snapshot index
706         doReturn(-1L).when(mockReplicatedLog).getSnapshotIndex();
707         reader.init(mockReplicatedLog, 7L, lastLogEntry, true);
708
709         assertEquals("getTerm", -1L, reader.getTerm());
710         assertEquals("getIndex", -1L, reader.getIndex());
711     }
712 }