Bug-2692:Avoid fake snapshot during initiate snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import org.junit.After;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
43 import org.opendaylight.controller.cluster.notifications.RoleChanged;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
50 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
51 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
52 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
53 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
59 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
60 import scala.concurrent.Await;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
64
65 import static org.junit.Assert.assertEquals;
66 import static org.junit.Assert.assertFalse;
67 import static org.junit.Assert.assertNotEquals;
68 import static org.junit.Assert.assertNotNull;
69 import static org.junit.Assert.assertNull;
70 import static org.junit.Assert.assertTrue;
71 import static org.mockito.Matchers.any;
72 import static org.mockito.Matchers.anyObject;
73 import static org.mockito.Matchers.eq;
74 import static org.mockito.Mockito.doReturn;
75 import static org.mockito.Mockito.mock;
76 import static org.mockito.Mockito.times;
77 import static org.mockito.Mockito.verify;
78
79 public class RaftActorTest extends AbstractActorTest {
80
81
82     @After
83     public void tearDown() {
84         MockAkkaJournal.clearJournal();
85         MockSnapshotStore.setMockSnapshot(null);
86     }
87
88     public static class MockRaftActor extends RaftActor {
89
90         private final DataPersistenceProvider dataPersistenceProvider;
91         private final RaftActor delegate;
92         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
93         private final List<Object> state;
94         private ActorRef roleChangeNotifier;
95         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
96
97         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
98             private static final long serialVersionUID = 1L;
99             private final Map<String, String> peerAddresses;
100             private final String id;
101             private final Optional<ConfigParams> config;
102             private final DataPersistenceProvider dataPersistenceProvider;
103             private final ActorRef roleChangeNotifier;
104
105             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
106                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
107                 ActorRef roleChangeNotifier) {
108                 this.peerAddresses = peerAddresses;
109                 this.id = id;
110                 this.config = config;
111                 this.dataPersistenceProvider = dataPersistenceProvider;
112                 this.roleChangeNotifier = roleChangeNotifier;
113             }
114
115             @Override
116             public MockRaftActor create() throws Exception {
117                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
118                     dataPersistenceProvider);
119                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
120                 return mockRaftActor;
121             }
122         }
123
124         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
125                              DataPersistenceProvider dataPersistenceProvider) {
126             super(id, peerAddresses, config);
127             state = new ArrayList<>();
128             this.delegate = mock(RaftActor.class);
129             if(dataPersistenceProvider == null){
130                 this.dataPersistenceProvider = new PersistentDataProvider();
131             } else {
132                 this.dataPersistenceProvider = dataPersistenceProvider;
133             }
134         }
135
136         public void waitForRecoveryComplete() {
137             try {
138                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
139             } catch (InterruptedException e) {
140                 e.printStackTrace();
141             }
142         }
143
144         public void waitForInitializeBehaviorComplete() {
145             try {
146                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
147             } catch (InterruptedException e) {
148                 e.printStackTrace();
149             }
150         }
151
152         public List<Object> getState() {
153             return state;
154         }
155
156         public static Props props(final String id, final Map<String, String> peerAddresses,
157                 Optional<ConfigParams> config){
158             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
159         }
160
161         public static Props props(final String id, final Map<String, String> peerAddresses,
162                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
163             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
164         }
165
166         public static Props props(final String id, final Map<String, String> peerAddresses,
167             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
168             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
169         }
170
171         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
172             delegate.applyState(clientActor, identifier, data);
173             LOG.info("applyState called");
174         }
175
176         @Override
177         protected void startLogRecoveryBatch(int maxBatchSize) {
178         }
179
180         @Override
181         protected void appendRecoveredLogEntry(Payload data) {
182             state.add(data);
183         }
184
185         @Override
186         protected void applyCurrentLogRecoveryBatch() {
187         }
188
189         @Override
190         protected void onRecoveryComplete() {
191             delegate.onRecoveryComplete();
192             recoveryComplete.countDown();
193         }
194
195         @Override
196         protected void initializeBehavior() {
197             super.initializeBehavior();
198             initializeBehaviorComplete.countDown();
199         }
200
201         @Override
202         protected void applyRecoverySnapshot(byte[] bytes) {
203             delegate.applyRecoverySnapshot(bytes);
204             try {
205                 Object data = toObject(bytes);
206                 if (data instanceof List) {
207                     state.addAll((List<?>) data);
208                 }
209             } catch (Exception e) {
210                 e.printStackTrace();
211             }
212         }
213
214         @Override protected void createSnapshot() {
215             delegate.createSnapshot();
216         }
217
218         @Override protected void applySnapshot(byte [] snapshot) {
219             delegate.applySnapshot(snapshot);
220         }
221
222         @Override protected void onStateChanged() {
223             delegate.onStateChanged();
224         }
225
226         @Override
227         protected DataPersistenceProvider persistence() {
228             return this.dataPersistenceProvider;
229         }
230
231         @Override
232         protected Optional<ActorRef> getRoleChangeNotifier() {
233             return Optional.fromNullable(roleChangeNotifier);
234         }
235
236         @Override public String persistenceId() {
237             return this.getId();
238         }
239
240         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
241             Object obj = null;
242             ByteArrayInputStream bis = null;
243             ObjectInputStream ois = null;
244             try {
245                 bis = new ByteArrayInputStream(bs);
246                 ois = new ObjectInputStream(bis);
247                 obj = ois.readObject();
248             } finally {
249                 if (bis != null) {
250                     bis.close();
251                 }
252                 if (ois != null) {
253                     ois.close();
254                 }
255             }
256             return obj;
257         }
258
259         public ReplicatedLog getReplicatedLog(){
260             return this.getRaftActorContext().getReplicatedLog();
261         }
262
263     }
264
265
266     private static class RaftActorTestKit extends JavaTestKit {
267         private final ActorRef raftActor;
268
269         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
270             super(actorSystem);
271
272             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
273                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
274
275         }
276
277
278         public ActorRef getRaftActor() {
279             return raftActor;
280         }
281
282         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
283             // Wait for a specific log message to show up
284             return
285                 new JavaTestKit.EventFilter<Boolean>(logEventClass
286                 ) {
287                     @Override
288                     protected Boolean run() {
289                         return true;
290                     }
291                 }.from(raftActor.path().toString())
292                     .message(message)
293                     .occurrences(1).exec();
294
295
296         }
297
298         protected void waitUntilLeader(){
299             waitUntilLeader(raftActor);
300         }
301
302         protected void waitUntilLeader(ActorRef actorRef) {
303             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
304             for(int i = 0; i < 20 * 5; i++) {
305                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
306                 try {
307                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
308                     if(resp.getLeaderActor() != null) {
309                         return;
310                     }
311                 } catch(TimeoutException e) {
312                 } catch(Exception e) {
313                     System.err.println("FindLeader threw ex");
314                     e.printStackTrace();
315                 }
316
317
318                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
319             }
320
321             Assert.fail("Leader not found for actorRef " + actorRef.path());
322         }
323
324     }
325
326
327     @Test
328     public void testConstruction() {
329         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
330     }
331
332     @Test
333     public void testFindLeaderWhenLeaderIsSelf(){
334         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
335         kit.waitUntilLeader();
336     }
337
338     @Test
339     public void testRaftActorRecovery() throws Exception {
340         new JavaTestKit(getSystem()) {{
341             String persistenceId = "follower10";
342
343             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
344             // Set the heartbeat interval high to essentially disable election otherwise the test
345             // may fail if the actor is switched to Leader and the commitIndex is set to the last
346             // log entry.
347             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
348
349             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
350                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
351
352             watch(followerActor);
353
354             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
355             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
356                     new MockRaftActorContext.MockPayload("E"));
357             snapshotUnappliedEntries.add(entry1);
358
359             int lastAppliedDuringSnapshotCapture = 3;
360             int lastIndexDuringSnapshotCapture = 4;
361
362                 // 4 messages as part of snapshot, which are applied to state
363             ByteString snapshotBytes  = fromObject(Arrays.asList(
364                     new MockRaftActorContext.MockPayload("A"),
365                     new MockRaftActorContext.MockPayload("B"),
366                     new MockRaftActorContext.MockPayload("C"),
367                     new MockRaftActorContext.MockPayload("D")));
368
369             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
370                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
371                     lastAppliedDuringSnapshotCapture, 1);
372             MockSnapshotStore.setMockSnapshot(snapshot);
373             MockSnapshotStore.setPersistenceId(persistenceId);
374
375             // add more entries after snapshot is taken
376             List<ReplicatedLogEntry> entries = new ArrayList<>();
377             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
378                     new MockRaftActorContext.MockPayload("F"));
379             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
380                     new MockRaftActorContext.MockPayload("G"));
381             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
382                     new MockRaftActorContext.MockPayload("H"));
383             entries.add(entry2);
384             entries.add(entry3);
385             entries.add(entry4);
386
387             int lastAppliedToState = 5;
388             int lastIndex = 7;
389
390             MockAkkaJournal.addToJournal(5, entry2);
391             // 2 entries are applied to state besides the 4 entries in snapshot
392             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
393             MockAkkaJournal.addToJournal(7, entry3);
394             MockAkkaJournal.addToJournal(8, entry4);
395
396             // kill the actor
397             followerActor.tell(PoisonPill.getInstance(), null);
398             expectMsgClass(duration("5 seconds"), Terminated.class);
399
400             unwatch(followerActor);
401
402             //reinstate the actor
403             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
404                     MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
405                             Optional.<ConfigParams>of(config)));
406
407             ref.underlyingActor().waitForRecoveryComplete();
408
409             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
410             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
411                     context.getReplicatedLog().size());
412             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
413             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
414             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
415             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
416         }};
417     }
418
419     /**
420      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
421      * process recovery messages
422      *
423      * @throws Exception
424      */
425
426     @Test
427     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
428         new JavaTestKit(getSystem()) {
429             {
430                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
431
432                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
433
434                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
435
436                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
437                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
438
439                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
440
441                 // Wait for akka's recovery to complete so it doesn't interfere.
442                 mockRaftActor.waitForRecoveryComplete();
443
444                 ByteString snapshotBytes  = fromObject(Arrays.asList(
445                         new MockRaftActorContext.MockPayload("A"),
446                         new MockRaftActorContext.MockPayload("B"),
447                         new MockRaftActorContext.MockPayload("C"),
448                         new MockRaftActorContext.MockPayload("D")));
449
450                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
451                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
452
453                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
454
455                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
456
457                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
458
459                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
460
461                 assertEquals("add replicated log entry", 1, replicatedLog.size());
462
463                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
464
465                 assertEquals("add replicated log entry", 2, replicatedLog.size());
466
467                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
468
469                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
470
471                 // The snapshot had 4 items + we added 2 more items during the test
472                 // We start removing from 5 and we should get 1 item in the replicated log
473                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
474
475                 assertEquals("remove log entries", 1, replicatedLog.size());
476
477                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
478
479                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
480                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
481
482                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
483
484                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
485
486             }};
487     }
488
489     /**
490      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
491      * not process recovery messages
492      *
493      * @throws Exception
494      */
495     @Test
496     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
497         new JavaTestKit(getSystem()) {
498             {
499                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
500
501                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
502
503                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
504
505                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
506                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
507
508                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
509
510                 // Wait for akka's recovery to complete so it doesn't interfere.
511                 mockRaftActor.waitForRecoveryComplete();
512
513                 ByteString snapshotBytes  = fromObject(Arrays.asList(
514                         new MockRaftActorContext.MockPayload("A"),
515                         new MockRaftActorContext.MockPayload("B"),
516                         new MockRaftActorContext.MockPayload("C"),
517                         new MockRaftActorContext.MockPayload("D")));
518
519                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
520                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
521
522                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
523
524                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
525
526                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
527
528                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
529
530                 assertEquals("add replicated log entry", 0, replicatedLog.size());
531
532                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
533
534                 assertEquals("add replicated log entry", 0, replicatedLog.size());
535
536                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
537
538                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
539
540                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
541
542                 assertEquals("remove log entries", 0, replicatedLog.size());
543
544                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
545
546                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
547                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
548
549                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
550
551                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
552             }};
553     }
554
555
556     @Test
557     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
558         new JavaTestKit(getSystem()) {
559             {
560                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
561
562                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
563
564                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
565
566                 CountDownLatch persistLatch = new CountDownLatch(1);
567                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
568                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
569
570                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
571                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
572
573                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
574
575                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
576
577                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
578
579                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
580
581             }
582         };
583     }
584
585     @Test
586     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
587         new JavaTestKit(getSystem()) {
588             {
589                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
590
591                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
592
593                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
594
595                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
596
597                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
598                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
599
600                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
601
602                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
603
604                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
605
606                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
607
608                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
609
610             }
611         };
612     }
613
614     @Test
615     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
616         new JavaTestKit(getSystem()) {
617             {
618                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
619
620                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
621
622                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
623
624                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
625
626                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
627                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
628
629                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
630
631                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
632
633                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
634
635                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
636
637                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
638
639             }
640         };
641     }
642
643     @Test
644     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
645         new JavaTestKit(getSystem()) {
646             {
647                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
648
649                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
650
651                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
652
653                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
654
655                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
656                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
657
658                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
659
660                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
661
662                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
663
664                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
665
666             }
667         };
668     }
669
670     @Test
671     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
672         new JavaTestKit(getSystem()) {
673             {
674                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
675
676                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
677
678                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679
680                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
681
682                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
683                     MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
684                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
685
686                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
687
688                 ByteString snapshotBytes  = fromObject(Arrays.asList(
689                         new MockRaftActorContext.MockPayload("A"),
690                         new MockRaftActorContext.MockPayload("B"),
691                         new MockRaftActorContext.MockPayload("C"),
692                         new MockRaftActorContext.MockPayload("D")));
693
694                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
695
696                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
697
698                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
699
700                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
701
702                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
703
704                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
705
706             }
707         };
708     }
709
710     @Test
711     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
712         new JavaTestKit(getSystem()) {
713             {
714                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
715
716                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
717
718                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
719
720                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
721
722                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
723                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
724
725                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
726
727                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
728                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
729                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
730                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
731                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
732
733                 ByteString snapshotBytes = fromObject(Arrays.asList(
734                         new MockRaftActorContext.MockPayload("A"),
735                         new MockRaftActorContext.MockPayload("B"),
736                         new MockRaftActorContext.MockPayload("C"),
737                         new MockRaftActorContext.MockPayload("D")));
738
739                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
740                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
741
742                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
743
744                 verify(mockRaftActor.delegate).createSnapshot();
745
746                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
747
748                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
749
750                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
751
752                 verify(dataPersistenceProvider).deleteMessages(100);
753
754                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
755
756                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
757                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
758
759                 // Index 2 will not be in the log because it was removed due to snapshotting
760                 assertNull(mockRaftActor.getReplicatedLog().get(2));
761
762                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
763
764             }
765         };
766     }
767
768     @Test
769     public void testApplyState() throws Exception {
770
771         new JavaTestKit(getSystem()) {
772             {
773                 String persistenceId = "testApplyState";
774
775                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
776
777                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
778
779                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
780
781                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
782                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
783
784                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
785
786                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
787                         new MockRaftActorContext.MockPayload("F"));
788
789                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
790
791                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
792
793                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
794
795             }
796         };
797     }
798
799     @Test
800     public void testApplySnapshot() throws Exception {
801         new JavaTestKit(getSystem()) {
802             {
803                 String persistenceId = "testApplySnapshot";
804
805                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
806
807                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
808
809                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
810
811                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
812                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
813
814                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
815
816                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
817
818                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
819                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
820                 oldReplicatedLog.append(
821                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
822                         mock(Payload.class)));
823
824                 ByteString snapshotBytes = fromObject(Arrays.asList(
825                     new MockRaftActorContext.MockPayload("A"),
826                     new MockRaftActorContext.MockPayload("B"),
827                     new MockRaftActorContext.MockPayload("C"),
828                     new MockRaftActorContext.MockPayload("D")));
829
830                 Snapshot snapshot = mock(Snapshot.class);
831
832                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
833
834                 doReturn(3L).when(snapshot).getLastAppliedIndex();
835
836                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
837
838                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
839
840                 assertTrue("The replicatedLog should have changed",
841                     oldReplicatedLog != mockRaftActor.getReplicatedLog());
842
843                 assertEquals("lastApplied should be same as in the snapshot",
844                     (Long) 3L, mockRaftActor.getLastApplied());
845
846                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
847
848                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
849
850             }
851         };
852     }
853
854     @Test
855     public void testSaveSnapshotFailure() throws Exception {
856         new JavaTestKit(getSystem()) {
857             {
858                 String persistenceId = "testSaveSnapshotFailure";
859
860                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
861
862                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
863
864                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
865
866                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
867                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
868
869                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
870
871                 ByteString snapshotBytes  = fromObject(Arrays.asList(
872                         new MockRaftActorContext.MockPayload("A"),
873                         new MockRaftActorContext.MockPayload("B"),
874                         new MockRaftActorContext.MockPayload("C"),
875                         new MockRaftActorContext.MockPayload("D")));
876
877                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
878
879                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
880
881                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
882
883                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
884
885                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
886                         new Exception()));
887
888                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
889                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
890
891                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
892
893             }
894         };
895     }
896
897     @Test
898     public void testRaftRoleChangeNotifier() throws Exception {
899         new JavaTestKit(getSystem()) {{
900             ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
901             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
902             String id = "testRaftRoleChangeNotifier";
903
904             TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
905                 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
906
907             // sleeping for a minimum of 2 seconds, if it spans more its fine.
908             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
909
910             List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
911             assertNotNull(matches);
912             assertEquals(3, matches.size());
913
914             // check if the notifier got a role change from null to Follower
915             RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
916             assertEquals(id, raftRoleChanged.getMemberId());
917             assertNull(raftRoleChanged.getOldRole());
918             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
919
920             // check if the notifier got a role change from Follower to Candidate
921             raftRoleChanged = (RoleChanged) matches.get(1);
922             assertEquals(id, raftRoleChanged.getMemberId());
923             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
924             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
925
926             // check if the notifier got a role change from Candidate to Leader
927             raftRoleChanged = (RoleChanged) matches.get(2);
928             assertEquals(id, raftRoleChanged.getMemberId());
929             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
930             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
931         }};
932     }
933
934     @Test
935     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
936         new JavaTestKit(getSystem()) {
937             {
938                 String persistenceId = "leader1";
939
940                 ActorRef followerActor1 =
941                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
942
943                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
946
947                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
948
949                 Map<String, String> peerAddresses = new HashMap<>();
950                 peerAddresses.put("follower-1", followerActor1.path().toString());
951
952                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
953                         MockRaftActor.props(persistenceId, peerAddresses,
954                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
955
956                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
957                 leaderActor.getRaftActorContext().setCommitIndex(4);
958                 leaderActor.getRaftActorContext().setLastApplied(4);
959                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
960
961                 leaderActor.waitForInitializeBehaviorComplete();
962
963                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
964
965                 Leader leader = new Leader(leaderActor.getRaftActorContext());
966                 leaderActor.setCurrentBehavior(leader);
967                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
968
969                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
970                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
971
972                 assertEquals(8, leaderActor.getReplicatedLog().size());
973
974                 leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
975                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
976                 verify(leaderActor.delegate).createSnapshot();
977
978                 assertEquals(8, leaderActor.getReplicatedLog().size());
979
980                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
981                 //fake snapshot on index 5
982                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
983
984                 assertEquals(8, leaderActor.getReplicatedLog().size());
985
986                 //fake snapshot on index 6
987                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
988                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
989                 assertEquals(8, leaderActor.getReplicatedLog().size());
990
991                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
992
993                 assertEquals(8, leaderActor.getReplicatedLog().size());
994
995                 ByteString snapshotBytes  = fromObject(Arrays.asList(
996                         new MockRaftActorContext.MockPayload("foo-0"),
997                         new MockRaftActorContext.MockPayload("foo-1"),
998                         new MockRaftActorContext.MockPayload("foo-2"),
999                         new MockRaftActorContext.MockPayload("foo-3"),
1000                         new MockRaftActorContext.MockPayload("foo-4")));
1001                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1002                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1003
1004                 // capture snapshot reply should remove the snapshotted entries only
1005                 assertEquals(3, leaderActor.getReplicatedLog().size());
1006                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1007
1008                 // add another non-replicated entry
1009                 leaderActor.getReplicatedLog().append(
1010                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1011
1012                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1013                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
1014                 assertEquals(2, leaderActor.getReplicatedLog().size());
1015                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1016
1017                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1018
1019             }
1020         };
1021     }
1022
1023     @Test
1024     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1025         new JavaTestKit(getSystem()) {
1026             {
1027                 String persistenceId = "follower1";
1028
1029                 ActorRef leaderActor1 =
1030                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1031
1032                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1033                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1034                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1035
1036                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1037
1038                 Map<String, String> peerAddresses = new HashMap<>();
1039                 peerAddresses.put("leader", leaderActor1.path().toString());
1040
1041                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1042                         MockRaftActor.props(persistenceId, peerAddresses,
1043                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1044
1045                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1046                 followerActor.getRaftActorContext().setCommitIndex(4);
1047                 followerActor.getRaftActorContext().setLastApplied(4);
1048                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1049
1050                 followerActor.waitForInitializeBehaviorComplete();
1051
1052                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1053                 Follower follower = new Follower(followerActor.getRaftActorContext());
1054                 followerActor.setCurrentBehavior(follower);
1055                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1056
1057                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1058                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1059
1060                 // log as indices 0-5
1061                 assertEquals(6, followerActor.getReplicatedLog().size());
1062
1063                 //snapshot on 4
1064                 followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
1065                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1066                 verify(followerActor.delegate).createSnapshot();
1067
1068                 assertEquals(6, followerActor.getReplicatedLog().size());
1069
1070                 //fake snapshot on index 6
1071                 List<ReplicatedLogEntry> entries =
1072                         Arrays.asList(
1073                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1074                                         new MockRaftActorContext.MockPayload("foo-6"))
1075                         );
1076                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
1077                 assertEquals(7, followerActor.getReplicatedLog().size());
1078
1079                 //fake snapshot on index 7
1080                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1081
1082                 entries =
1083                         Arrays.asList(
1084                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1085                                         new MockRaftActorContext.MockPayload("foo-7"))
1086                         );
1087                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
1088                 assertEquals(8, followerActor.getReplicatedLog().size());
1089
1090                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1091
1092
1093                 ByteString snapshotBytes  = fromObject(Arrays.asList(
1094                         new MockRaftActorContext.MockPayload("foo-0"),
1095                         new MockRaftActorContext.MockPayload("foo-1"),
1096                         new MockRaftActorContext.MockPayload("foo-2"),
1097                         new MockRaftActorContext.MockPayload("foo-3"),
1098                         new MockRaftActorContext.MockPayload("foo-4")));
1099                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1100                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1101
1102                 // capture snapshot reply should remove the snapshotted entries only
1103                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1104                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1105
1106                 entries =
1107                         Arrays.asList(
1108                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1109                                         new MockRaftActorContext.MockPayload("foo-7"))
1110                         );
1111                 // send an additional entry 8 with leaderCommit = 7
1112                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
1113
1114                 // 7 and 8, as lastapplied is 7
1115                 assertEquals(2, followerActor.getReplicatedLog().size());
1116
1117                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1118
1119             }
1120         };
1121     }
1122
1123     @Test
1124     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1125         new JavaTestKit(getSystem()) {
1126             {
1127                 String persistenceId = "leader1";
1128
1129                 ActorRef followerActor1 =
1130                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1131                 ActorRef followerActor2 =
1132                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1133
1134                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1135                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1136                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1137
1138                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1139
1140                 Map<String, String> peerAddresses = new HashMap<>();
1141                 peerAddresses.put("follower-1", followerActor1.path().toString());
1142                 peerAddresses.put("follower-2", followerActor2.path().toString());
1143
1144                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1145                         MockRaftActor.props(persistenceId, peerAddresses,
1146                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1147
1148                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1149                 leaderActor.getRaftActorContext().setCommitIndex(9);
1150                 leaderActor.getRaftActorContext().setLastApplied(9);
1151                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1152
1153                 leaderActor.waitForInitializeBehaviorComplete();
1154
1155                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1156                 leaderActor.setCurrentBehavior(leader);
1157                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1158
1159                 // create 5 entries in the log
1160                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1161                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1162                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1163                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1164                 assertEquals(5, leaderActor.getReplicatedLog().size());
1165
1166                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
1167                 assertEquals(5, leaderActor.getReplicatedLog().size());
1168
1169                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1170                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
1171                 assertEquals(5, leaderActor.getReplicatedLog().size());
1172
1173                 // simulate a real snapshot
1174                 leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
1175                 assertEquals(5, leaderActor.getReplicatedLog().size());
1176                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1177
1178                 //reply from a slow follower does not initiate a fake snapshot
1179                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
1180                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1181
1182                 ByteString snapshotBytes  = fromObject(Arrays.asList(
1183                         new MockRaftActorContext.MockPayload("foo-0"),
1184                         new MockRaftActorContext.MockPayload("foo-1"),
1185                         new MockRaftActorContext.MockPayload("foo-2"),
1186                         new MockRaftActorContext.MockPayload("foo-3"),
1187                         new MockRaftActorContext.MockPayload("foo-4")));
1188                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1189                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1190
1191                 assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
1192
1193                 //reply from a slow follower after should not raise errors
1194                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
1195                 assertEquals(0, leaderActor.getReplicatedLog().size());
1196
1197                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1198
1199             }
1200         };
1201     }
1202
1203
1204
1205     private ByteString fromObject(Object snapshot) throws Exception {
1206         ByteArrayOutputStream b = null;
1207         ObjectOutputStream o = null;
1208         try {
1209             b = new ByteArrayOutputStream();
1210             o = new ObjectOutputStream(b);
1211             o.writeObject(snapshot);
1212             byte[] snapshotBytes = b.toByteArray();
1213             return ByteString.copyFrom(snapshotBytes);
1214         } finally {
1215             if (o != null) {
1216                 o.flush();
1217                 o.close();
1218             }
1219             if (b != null) {
1220                 b.close();
1221             }
1222         }
1223     }
1224 }