Add unit tests for RaftActorRecoverySupport
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Matchers.any;
9 import static org.mockito.Matchers.anyObject;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.same;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Terminated;
20 import akka.japi.Procedure;
21 import akka.persistence.SaveSnapshotFailure;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import akka.persistence.SnapshotSelectionCriteria;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.google.protobuf.ByteString;
32 import java.io.ByteArrayOutputStream;
33 import java.io.ObjectOutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.DataPersistenceProvider;
46 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
47 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
48 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
49 import org.opendaylight.controller.cluster.notifications.RoleChanged;
50 import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
51 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
56 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
59 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
60 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class RaftActorTest extends AbstractActorTest {
70
71     private TestActorFactory factory;
72
73     @Before
74     public void setUp(){
75         factory = new TestActorFactory(getSystem());
76     }
77
78     @After
79     public void tearDown() throws Exception {
80         factory.close();
81         InMemoryJournal.clear();
82         InMemorySnapshotStore.clear();
83     }
84
85     @Test
86     public void testConstruction() {
87         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
88     }
89
90     @Test
91     public void testFindLeaderWhenLeaderIsSelf(){
92         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
93         kit.waitUntilLeader();
94     }
95
96     @Test
97     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
98         new JavaTestKit(getSystem()) {{
99             String persistenceId = factory.generateActorId("follower-");
100
101             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
102
103             // Set the heartbeat interval high to essentially disable election otherwise the test
104             // may fail if the actor is switched to Leader and the commitIndex is set to the last
105             // log entry.
106             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
107
108             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
109                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
110                     Optional.<ConfigParams>of(config)), persistenceId);
111
112             watch(followerActor);
113
114             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
115             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
116                     new MockRaftActorContext.MockPayload("E"));
117             snapshotUnappliedEntries.add(entry1);
118
119             int lastAppliedDuringSnapshotCapture = 3;
120             int lastIndexDuringSnapshotCapture = 4;
121
122             // 4 messages as part of snapshot, which are applied to state
123             ByteString snapshotBytes = fromObject(Arrays.asList(
124                     new MockRaftActorContext.MockPayload("A"),
125                     new MockRaftActorContext.MockPayload("B"),
126                     new MockRaftActorContext.MockPayload("C"),
127                     new MockRaftActorContext.MockPayload("D")));
128
129             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
130                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
131                     lastAppliedDuringSnapshotCapture, 1);
132             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
133
134             // add more entries after snapshot is taken
135             List<ReplicatedLogEntry> entries = new ArrayList<>();
136             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
137                     new MockRaftActorContext.MockPayload("F", 2));
138             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
139                     new MockRaftActorContext.MockPayload("G", 3));
140             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
141                     new MockRaftActorContext.MockPayload("H", 4));
142             entries.add(entry2);
143             entries.add(entry3);
144             entries.add(entry4);
145
146             int lastAppliedToState = 5;
147             int lastIndex = 7;
148
149             InMemoryJournal.addEntry(persistenceId, 5, entry2);
150             // 2 entries are applied to state besides the 4 entries in snapshot
151             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
152             InMemoryJournal.addEntry(persistenceId, 7, entry3);
153             InMemoryJournal.addEntry(persistenceId, 8, entry4);
154
155             // kill the actor
156             followerActor.tell(PoisonPill.getInstance(), null);
157             expectMsgClass(duration("5 seconds"), Terminated.class);
158
159             unwatch(followerActor);
160
161             //reinstate the actor
162             TestActorRef<MockRaftActor> ref = factory.createTestActor(
163                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
164                             Optional.<ConfigParams>of(config)));
165
166             MockRaftActor mockRaftActor = ref.underlyingActor();
167
168             mockRaftActor.waitForRecoveryComplete();
169
170             RaftActorContext context = mockRaftActor.getRaftActorContext();
171             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
172                     context.getReplicatedLog().size());
173             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
174             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
175             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
176             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
177             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
178
179             mockRaftActor.waitForInitializeBehaviorComplete();
180
181             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
182         }};
183     }
184
185     @Test
186     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
187         new JavaTestKit(getSystem()) {{
188             String persistenceId = factory.generateActorId("follower-");
189
190             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
191
192             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
193
194             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
195                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
196                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
197
198             MockRaftActor mockRaftActor = ref.underlyingActor();
199
200             mockRaftActor.waitForRecoveryComplete();
201
202             mockRaftActor.waitForInitializeBehaviorComplete();
203
204             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
205         }};
206     }
207
208     @Test
209     public void testRaftActorForwardsToRaftActorRecoverySupport() {
210         String persistenceId = factory.generateActorId("leader-");
211
212         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
213
214         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
215
216         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
217                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
218
219         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
220
221         // Wait for akka's recovery to complete so it doesn't interfere.
222         mockRaftActor.waitForRecoveryComplete();
223
224         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
225         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
226
227         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
228         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
229         mockRaftActor.handleRecover(snapshotOffer);
230
231         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
232                 1, new MockRaftActorContext.MockPayload("1", 5));
233         mockRaftActor.handleRecover(logEntry);
234
235         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
236         mockRaftActor.handleRecover(applyJournalEntries);
237
238         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
239         mockRaftActor.handleRecover(applyLogEntries);
240
241         DeleteEntries deleteEntries = new DeleteEntries(1);
242         mockRaftActor.handleRecover(deleteEntries);
243
244         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
245         mockRaftActor.handleRecover(updateElectionTerm);
246
247         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
248         verify(mockSupport).handleRecoveryMessage(same(logEntry));
249         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
250         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
251         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
252         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
253     }
254
255     @Test
256     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
257         new JavaTestKit(getSystem()) {
258             {
259                 String persistenceId = factory.generateActorId("leader-");
260
261                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
262
263                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
264
265                 CountDownLatch persistLatch = new CountDownLatch(1);
266                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
267                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
268
269                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
270                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
271
272                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
273
274                 mockRaftActor.waitForInitializeBehaviorComplete();
275
276                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
277
278                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
279             }
280         };
281     }
282
283     @Test
284     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
285         new JavaTestKit(getSystem()) {
286             {
287                 String persistenceId = factory.generateActorId("leader-");
288
289                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
290
291                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
292
293                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
294
295                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
296                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
297
298                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
299
300                 mockRaftActor.waitForInitializeBehaviorComplete();
301
302                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
303
304                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
305
306                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
307             }
308         };
309     }
310
311     @Test
312     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
313         new JavaTestKit(getSystem()) {
314             {
315                 String persistenceId = factory.generateActorId("leader-");
316
317                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
318
319                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
320
321                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
322
323                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
324                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
325
326                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
327
328                 mockRaftActor.waitForInitializeBehaviorComplete();
329
330                 mockRaftActor.waitUntilLeader();
331
332                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
333
334                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
335
336                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
337             }
338         };
339     }
340
341     @Test
342     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
343         new JavaTestKit(getSystem()) {
344             {
345                 String persistenceId = factory.generateActorId("leader-");
346
347                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
348
349                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
350
351                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
352
353                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
354                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
355
356                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
357
358                 mockRaftActor.waitForInitializeBehaviorComplete();
359
360                 mockRaftActor.waitUntilLeader();
361
362                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
363
364                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
365
366             }
367
368         };
369     }
370
371     @Test
372     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
373         new JavaTestKit(getSystem()) {
374             {
375                 String persistenceId = factory.generateActorId("leader-");
376
377                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
378
379                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
380
381                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
382
383                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
384                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
385                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
386
387                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
388
389                 mockRaftActor.waitForInitializeBehaviorComplete();
390
391                 ByteString snapshotBytes = fromObject(Arrays.asList(
392                         new MockRaftActorContext.MockPayload("A"),
393                         new MockRaftActorContext.MockPayload("B"),
394                         new MockRaftActorContext.MockPayload("C"),
395                         new MockRaftActorContext.MockPayload("D")));
396
397                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
398
399                 raftActorContext.getSnapshotManager().capture(
400                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
401                                 new MockRaftActorContext.MockPayload("D")), -1);
402
403                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
404
405                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
406
407                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
408
409             }
410         };
411     }
412
413     @Test
414     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
415         new JavaTestKit(getSystem()) {
416             {
417                 String persistenceId = factory.generateActorId("leader-");
418
419                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
420
421                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
422
423                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
424
425                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
426                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
427
428                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
429
430                 mockRaftActor.waitForInitializeBehaviorComplete();
431                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
432
433                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
434                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
435                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
436                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
437                 mockRaftActor.getReplicatedLog().append(lastEntry);
438
439                 ByteString snapshotBytes = fromObject(Arrays.asList(
440                         new MockRaftActorContext.MockPayload("A"),
441                         new MockRaftActorContext.MockPayload("B"),
442                         new MockRaftActorContext.MockPayload("C"),
443                         new MockRaftActorContext.MockPayload("D")));
444
445                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
446                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
447
448                 long replicatedToAllIndex = 1;
449
450                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
451
452                 verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
453
454                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
455
456                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
457
458                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
459
460                 verify(dataPersistenceProvider).deleteMessages(100);
461
462                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
463                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
464
465                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
466                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
467                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
468
469                 // Index 2 will not be in the log because it was removed due to snapshotting
470                 assertNull(mockRaftActor.getReplicatedLog().get(1));
471                 assertNull(mockRaftActor.getReplicatedLog().get(0));
472
473             }
474         };
475     }
476
477     @Test
478     public void testApplyState() throws Exception {
479
480         new JavaTestKit(getSystem()) {
481             {
482                 String persistenceId = factory.generateActorId("leader-");
483
484                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
485
486                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
487
488                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
489
490                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
491                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
492
493                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
494
495                 mockRaftActor.waitForInitializeBehaviorComplete();
496
497                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
498                         new MockRaftActorContext.MockPayload("F"));
499
500                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
501
502                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
503
504             }
505         };
506     }
507
508     @Test
509     public void testApplySnapshot() throws Exception {
510         new JavaTestKit(getSystem()) {
511             {
512                 String persistenceId = factory.generateActorId("leader-");
513
514                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
515
516                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
517
518                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
519
520                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
521                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
522
523                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
524
525                 mockRaftActor.waitForInitializeBehaviorComplete();
526
527                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
528
529                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
530                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
531                 oldReplicatedLog.append(
532                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
533                                 mock(Payload.class)));
534
535                 ByteString snapshotBytes = fromObject(Arrays.asList(
536                         new MockRaftActorContext.MockPayload("A"),
537                         new MockRaftActorContext.MockPayload("B"),
538                         new MockRaftActorContext.MockPayload("C"),
539                         new MockRaftActorContext.MockPayload("D")));
540
541                 Snapshot snapshot = mock(Snapshot.class);
542
543                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
544
545                 doReturn(3L).when(snapshot).getLastAppliedIndex();
546
547                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
548
549                 verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
550
551                 assertTrue("The replicatedLog should have changed",
552                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
553
554                 assertEquals("lastApplied should be same as in the snapshot",
555                         (Long) 3L, mockRaftActor.getLastApplied());
556
557                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
558
559             }
560         };
561     }
562
563     @Test
564     public void testSaveSnapshotFailure() throws Exception {
565         new JavaTestKit(getSystem()) {
566             {
567                 String persistenceId = factory.generateActorId("leader-");
568
569                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
570
571                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
572
573                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
574
575                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
576                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
577
578                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
579
580                 mockRaftActor.waitForInitializeBehaviorComplete();
581
582                 ByteString snapshotBytes = fromObject(Arrays.asList(
583                         new MockRaftActorContext.MockPayload("A"),
584                         new MockRaftActorContext.MockPayload("B"),
585                         new MockRaftActorContext.MockPayload("C"),
586                         new MockRaftActorContext.MockPayload("D")));
587
588                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
589
590                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
591
592                 raftActorContext.getSnapshotManager().capture(
593                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
594                                 new MockRaftActorContext.MockPayload("D")), 1);
595
596                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
597
598                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
599                         new Exception()));
600
601                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
602                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
603
604             }
605         };
606     }
607
608     @Test
609     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
610         new JavaTestKit(getSystem()) {{
611             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
612                     Props.create(MessageCollectorActor.class));
613             MessageCollectorActor.waitUntilReady(notifierActor);
614
615             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
616             long heartBeatInterval = 100;
617             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
618             config.setElectionTimeoutFactor(20);
619
620             String persistenceId = factory.generateActorId("notifier-");
621
622             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
623                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
624                     new NonPersistentDataProvider()), persistenceId);
625
626             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
627
628
629             // check if the notifier got a role change from null to Follower
630             RoleChanged raftRoleChanged = matches.get(0);
631             assertEquals(persistenceId, raftRoleChanged.getMemberId());
632             assertNull(raftRoleChanged.getOldRole());
633             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
634
635             // check if the notifier got a role change from Follower to Candidate
636             raftRoleChanged = matches.get(1);
637             assertEquals(persistenceId, raftRoleChanged.getMemberId());
638             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
639             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
640
641             // check if the notifier got a role change from Candidate to Leader
642             raftRoleChanged = matches.get(2);
643             assertEquals(persistenceId, raftRoleChanged.getMemberId());
644             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
645             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
646
647             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
648                     notifierActor, LeaderStateChanged.class);
649
650             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
651
652             notifierActor.underlyingActor().clear();
653
654             MockRaftActor raftActor = raftActorRef.underlyingActor();
655             final String newLeaderId = "new-leader";
656             Follower follower = new Follower(raftActor.getRaftActorContext()) {
657                 @Override
658                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
659                     leaderId = newLeaderId;
660                     return this;
661                 }
662             };
663
664             raftActor.changeCurrentBehavior(follower);
665
666             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
667             assertEquals(persistenceId, leaderStateChange.getMemberId());
668             assertEquals(null, leaderStateChange.getLeaderId());
669
670             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
671             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
672             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
673
674             notifierActor.underlyingActor().clear();
675
676             raftActor.handleCommand("any");
677
678             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
679             assertEquals(persistenceId, leaderStateChange.getMemberId());
680             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
681         }};
682     }
683
684     @Test
685     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
686         new JavaTestKit(getSystem()) {{
687             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
688             MessageCollectorActor.waitUntilReady(notifierActor);
689
690             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
691             long heartBeatInterval = 100;
692             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
693             config.setElectionTimeoutFactor(1);
694
695             String persistenceId = factory.generateActorId("notifier-");
696
697             factory.createActor(MockRaftActor.props(persistenceId,
698                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
699
700             List<RoleChanged> matches =  null;
701             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
702                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
703                 assertNotNull(matches);
704                 if(matches.size() == 3) {
705                     break;
706                 }
707                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
708             }
709
710             assertEquals(2, matches.size());
711
712             // check if the notifier got a role change from null to Follower
713             RoleChanged raftRoleChanged = matches.get(0);
714             assertEquals(persistenceId, raftRoleChanged.getMemberId());
715             assertNull(raftRoleChanged.getOldRole());
716             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
717
718             // check if the notifier got a role change from Follower to Candidate
719             raftRoleChanged = matches.get(1);
720             assertEquals(persistenceId, raftRoleChanged.getMemberId());
721             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
722             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
723
724         }};
725     }
726
727     @Test
728     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
729         new JavaTestKit(getSystem()) {
730             {
731                 String persistenceId = factory.generateActorId("leader-");
732                 String follower1Id = factory.generateActorId("follower-");
733
734                 ActorRef followerActor1 =
735                         factory.createActor(Props.create(MessageCollectorActor.class));
736
737                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
738                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
739                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
740
741                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
742
743                 Map<String, String> peerAddresses = new HashMap<>();
744                 peerAddresses.put(follower1Id, followerActor1.path().toString());
745
746                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
747                         MockRaftActor.props(persistenceId, peerAddresses,
748                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
749
750                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
751
752                 leaderActor.getRaftActorContext().setCommitIndex(4);
753                 leaderActor.getRaftActorContext().setLastApplied(4);
754                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
755
756                 leaderActor.waitForInitializeBehaviorComplete();
757
758                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
759
760                 Leader leader = new Leader(leaderActor.getRaftActorContext());
761                 leaderActor.setCurrentBehavior(leader);
762                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
763
764                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
765                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
766
767                 assertEquals(8, leaderActor.getReplicatedLog().size());
768
769                 leaderActor.getRaftActorContext().getSnapshotManager()
770                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
771                                 new MockRaftActorContext.MockPayload("x")), 4);
772
773                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
774
775                 assertEquals(8, leaderActor.getReplicatedLog().size());
776
777                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
778                 //fake snapshot on index 5
779                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
780
781                 assertEquals(8, leaderActor.getReplicatedLog().size());
782
783                 //fake snapshot on index 6
784                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
785                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
786                 assertEquals(8, leaderActor.getReplicatedLog().size());
787
788                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
789
790                 assertEquals(8, leaderActor.getReplicatedLog().size());
791
792                 ByteString snapshotBytes = fromObject(Arrays.asList(
793                         new MockRaftActorContext.MockPayload("foo-0"),
794                         new MockRaftActorContext.MockPayload("foo-1"),
795                         new MockRaftActorContext.MockPayload("foo-2"),
796                         new MockRaftActorContext.MockPayload("foo-3"),
797                         new MockRaftActorContext.MockPayload("foo-4")));
798
799                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
800                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
801
802                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
803
804                 // The commit is needed to complete the snapshot creation process
805                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
806
807                 // capture snapshot reply should remove the snapshotted entries only
808                 assertEquals(3, leaderActor.getReplicatedLog().size());
809                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
810
811                 // add another non-replicated entry
812                 leaderActor.getReplicatedLog().append(
813                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
814
815                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
816                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
817                 assertEquals(2, leaderActor.getReplicatedLog().size());
818                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
819
820             }
821         };
822     }
823
824     @Test
825     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
826         new JavaTestKit(getSystem()) {
827             {
828                 String persistenceId = factory.generateActorId("follower-");
829                 String leaderId = factory.generateActorId("leader-");
830
831
832                 ActorRef leaderActor1 =
833                         factory.createActor(Props.create(MessageCollectorActor.class));
834
835                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
836                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
837                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
838
839                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
840
841                 Map<String, String> peerAddresses = new HashMap<>();
842                 peerAddresses.put(leaderId, leaderActor1.path().toString());
843
844                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
845                         MockRaftActor.props(persistenceId, peerAddresses,
846                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
847
848                 MockRaftActor followerActor = mockActorRef.underlyingActor();
849                 followerActor.getRaftActorContext().setCommitIndex(4);
850                 followerActor.getRaftActorContext().setLastApplied(4);
851                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
852
853                 followerActor.waitForInitializeBehaviorComplete();
854
855
856                 Follower follower = new Follower(followerActor.getRaftActorContext());
857                 followerActor.setCurrentBehavior(follower);
858                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
859
860                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
861                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
862                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
863
864                 // log has indices 0-5
865                 assertEquals(6, followerActor.getReplicatedLog().size());
866
867                 //snapshot on 4
868                 followerActor.getRaftActorContext().getSnapshotManager().capture(
869                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
870                                 new MockRaftActorContext.MockPayload("D")), 4);
871
872                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
873
874                 assertEquals(6, followerActor.getReplicatedLog().size());
875
876                 //fake snapshot on index 6
877                 List<ReplicatedLogEntry> entries =
878                         Arrays.asList(
879                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
880                                         new MockRaftActorContext.MockPayload("foo-6"))
881                         );
882                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
883                 assertEquals(7, followerActor.getReplicatedLog().size());
884
885                 //fake snapshot on index 7
886                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
887
888                 entries =
889                         Arrays.asList(
890                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
891                                         new MockRaftActorContext.MockPayload("foo-7"))
892                         );
893                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
894                 assertEquals(8, followerActor.getReplicatedLog().size());
895
896                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
897
898
899                 ByteString snapshotBytes = fromObject(Arrays.asList(
900                         new MockRaftActorContext.MockPayload("foo-0"),
901                         new MockRaftActorContext.MockPayload("foo-1"),
902                         new MockRaftActorContext.MockPayload("foo-2"),
903                         new MockRaftActorContext.MockPayload("foo-3"),
904                         new MockRaftActorContext.MockPayload("foo-4")));
905                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
906                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
907
908                 // The commit is needed to complete the snapshot creation process
909                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
910
911                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
912                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
913                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
914
915                 entries =
916                         Arrays.asList(
917                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
918                                         new MockRaftActorContext.MockPayload("foo-7"))
919                         );
920                 // send an additional entry 8 with leaderCommit = 7
921                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
922
923                 // 7 and 8, as lastapplied is 7
924                 assertEquals(2, followerActor.getReplicatedLog().size());
925
926             }
927         };
928     }
929
930     @Test
931     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
932         new JavaTestKit(getSystem()) {
933             {
934                 String persistenceId = factory.generateActorId("leader-");
935                 String follower1Id = factory.generateActorId("follower-");
936                 String follower2Id = factory.generateActorId("follower-");
937
938                 ActorRef followerActor1 =
939                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
940                 ActorRef followerActor2 =
941                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
942
943                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
946
947                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
948
949                 Map<String, String> peerAddresses = new HashMap<>();
950                 peerAddresses.put(follower1Id, followerActor1.path().toString());
951                 peerAddresses.put(follower2Id, followerActor2.path().toString());
952
953                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
954                         MockRaftActor.props(persistenceId, peerAddresses,
955                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
956
957                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
958                 leaderActor.getRaftActorContext().setCommitIndex(9);
959                 leaderActor.getRaftActorContext().setLastApplied(9);
960                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
961
962                 leaderActor.waitForInitializeBehaviorComplete();
963
964                 Leader leader = new Leader(leaderActor.getRaftActorContext());
965                 leaderActor.setCurrentBehavior(leader);
966                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
967
968                 // create 5 entries in the log
969                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
970                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
971
972                 //set the snapshot index to 4 , 0 to 4 are snapshotted
973                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
974                 //setting replicatedToAllIndex = 9, for the log to clear
975                 leader.setReplicatedToAllIndex(9);
976                 assertEquals(5, leaderActor.getReplicatedLog().size());
977                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
978
979                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
980                 assertEquals(5, leaderActor.getReplicatedLog().size());
981                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
982
983                 // set the 2nd follower nextIndex to 1 which has been snapshotted
984                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
985                 assertEquals(5, leaderActor.getReplicatedLog().size());
986                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
987
988                 // simulate a real snapshot
989                 leaderActor.onReceiveCommand(new SendHeartBeat());
990                 assertEquals(5, leaderActor.getReplicatedLog().size());
991                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
992                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
993                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
994
995
996                 //reply from a slow follower does not initiate a fake snapshot
997                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
998                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
999
1000                 ByteString snapshotBytes = fromObject(Arrays.asList(
1001                         new MockRaftActorContext.MockPayload("foo-0"),
1002                         new MockRaftActorContext.MockPayload("foo-1"),
1003                         new MockRaftActorContext.MockPayload("foo-2"),
1004                         new MockRaftActorContext.MockPayload("foo-3"),
1005                         new MockRaftActorContext.MockPayload("foo-4")));
1006                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1007                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1008
1009                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1010
1011                 //reply from a slow follower after should not raise errors
1012                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1013                 assertEquals(0, leaderActor.getReplicatedLog().size());
1014             }
1015         };
1016     }
1017
1018     @Test
1019     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1020         new JavaTestKit(getSystem()) {{
1021             String persistenceId = factory.generateActorId("leader-");
1022             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1023             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1024             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1025             config.setSnapshotBatchCount(5);
1026
1027             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1028
1029             Map<String, String> peerAddresses = new HashMap<>();
1030
1031             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1032                     MockRaftActor.props(persistenceId, peerAddresses,
1033                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1034
1035             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1036             leaderActor.getRaftActorContext().setCommitIndex(3);
1037             leaderActor.getRaftActorContext().setLastApplied(3);
1038             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1039
1040             leaderActor.waitForInitializeBehaviorComplete();
1041             for(int i=0;i< 4;i++) {
1042                 leaderActor.getReplicatedLog()
1043                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1044                                 new MockRaftActorContext.MockPayload("A")));
1045             }
1046
1047             Leader leader = new Leader(leaderActor.getRaftActorContext());
1048             leaderActor.setCurrentBehavior(leader);
1049             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1050
1051             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1052             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1053
1054             // Now send a CaptureSnapshotReply
1055             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1056
1057             // Trimming log in this scenario is a no-op
1058             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1059             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1060             assertEquals(-1, leader.getReplicatedToAllIndex());
1061
1062         }};
1063     }
1064
1065     @Test
1066     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1067         new JavaTestKit(getSystem()) {{
1068             String persistenceId = factory.generateActorId("leader-");
1069             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1070             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1071             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1072             config.setSnapshotBatchCount(5);
1073
1074             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1075
1076             Map<String, String> peerAddresses = new HashMap<>();
1077
1078             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1079                     MockRaftActor.props(persistenceId, peerAddresses,
1080                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1081
1082             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1083             leaderActor.getRaftActorContext().setCommitIndex(3);
1084             leaderActor.getRaftActorContext().setLastApplied(3);
1085             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1086             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1087
1088             leaderActor.waitForInitializeBehaviorComplete();
1089             Leader leader = new Leader(leaderActor.getRaftActorContext());
1090             leaderActor.setCurrentBehavior(leader);
1091             leader.setReplicatedToAllIndex(3);
1092             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1093
1094             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1095             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1096
1097             // Now send a CaptureSnapshotReply
1098             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1099
1100             // Trimming log in this scenario is a no-op
1101             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1102             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1103             assertEquals(3, leader.getReplicatedToAllIndex());
1104
1105         }};
1106     }
1107
1108     private ByteString fromObject(Object snapshot) throws Exception {
1109         ByteArrayOutputStream b = null;
1110         ObjectOutputStream o = null;
1111         try {
1112             b = new ByteArrayOutputStream();
1113             o = new ObjectOutputStream(b);
1114             o.writeObject(snapshot);
1115             byte[] snapshotBytes = b.toByteArray();
1116             return ByteString.copyFrom(snapshotBytes);
1117         } finally {
1118             if (o != null) {
1119                 o.flush();
1120                 o.close();
1121             }
1122             if (b != null) {
1123                 b.close();
1124             }
1125         }
1126     }
1127
1128 }