Merge "Fix RaftActorTest.testRaftRoleChangeNotifier failure"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.RoleChanged;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
61 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
63 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
64 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
65 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
66 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
67 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
70 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
71 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
72 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
73 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
74 import scala.concurrent.Await;
75 import scala.concurrent.Future;
76 import scala.concurrent.duration.Duration;
77 import scala.concurrent.duration.FiniteDuration;
78
79 public class RaftActorTest extends AbstractActorTest {
80
81     private TestActorFactory factory;
82
83     @Before
84     public void setUp(){
85         factory = new TestActorFactory(getSystem());
86     }
87
88     @After
89     public void tearDown() throws Exception {
90         factory.close();
91         MockAkkaJournal.clearJournal();
92         MockSnapshotStore.setMockSnapshot(null);
93     }
94
95     public static class MockRaftActor extends RaftActor {
96
97         private final DataPersistenceProvider dataPersistenceProvider;
98         private final RaftActor delegate;
99         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
100         private final List<Object> state;
101         private ActorRef roleChangeNotifier;
102         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
103
104         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
105             private static final long serialVersionUID = 1L;
106             private final Map<String, String> peerAddresses;
107             private final String id;
108             private final Optional<ConfigParams> config;
109             private final DataPersistenceProvider dataPersistenceProvider;
110             private final ActorRef roleChangeNotifier;
111
112             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
113                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
114                 ActorRef roleChangeNotifier) {
115                 this.peerAddresses = peerAddresses;
116                 this.id = id;
117                 this.config = config;
118                 this.dataPersistenceProvider = dataPersistenceProvider;
119                 this.roleChangeNotifier = roleChangeNotifier;
120             }
121
122             @Override
123             public MockRaftActor create() throws Exception {
124                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
125                     dataPersistenceProvider);
126                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
127                 return mockRaftActor;
128             }
129         }
130
131         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
132                              DataPersistenceProvider dataPersistenceProvider) {
133             super(id, peerAddresses, config);
134             state = new ArrayList<>();
135             this.delegate = mock(RaftActor.class);
136             if(dataPersistenceProvider == null){
137                 this.dataPersistenceProvider = new PersistentDataProvider();
138             } else {
139                 this.dataPersistenceProvider = dataPersistenceProvider;
140             }
141         }
142
143         public void waitForRecoveryComplete() {
144             try {
145                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
146             } catch (InterruptedException e) {
147                 e.printStackTrace();
148             }
149         }
150
151         public void waitForInitializeBehaviorComplete() {
152             try {
153                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
154             } catch (InterruptedException e) {
155                 e.printStackTrace();
156             }
157         }
158
159         public List<Object> getState() {
160             return state;
161         }
162
163         public static Props props(final String id, final Map<String, String> peerAddresses,
164                 Optional<ConfigParams> config){
165             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
166         }
167
168         public static Props props(final String id, final Map<String, String> peerAddresses,
169                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
170             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
171         }
172
173         public static Props props(final String id, final Map<String, String> peerAddresses,
174             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
175             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
176         }
177
178         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
179             delegate.applyState(clientActor, identifier, data);
180             LOG.info("applyState called");
181         }
182
183         @Override
184         protected void startLogRecoveryBatch(int maxBatchSize) {
185         }
186
187         @Override
188         protected void appendRecoveredLogEntry(Payload data) {
189             state.add(data);
190         }
191
192         @Override
193         protected void applyCurrentLogRecoveryBatch() {
194         }
195
196         @Override
197         protected void onRecoveryComplete() {
198             delegate.onRecoveryComplete();
199             recoveryComplete.countDown();
200         }
201
202         @Override
203         protected void initializeBehavior() {
204             super.initializeBehavior();
205             initializeBehaviorComplete.countDown();
206         }
207
208         @Override
209         protected void applyRecoverySnapshot(byte[] bytes) {
210             delegate.applyRecoverySnapshot(bytes);
211             try {
212                 Object data = toObject(bytes);
213                 if (data instanceof List) {
214                     state.addAll((List<?>) data);
215                 }
216             } catch (Exception e) {
217                 e.printStackTrace();
218             }
219         }
220
221         @Override protected void createSnapshot() {
222             delegate.createSnapshot();
223         }
224
225         @Override protected void applySnapshot(byte [] snapshot) {
226             delegate.applySnapshot(snapshot);
227         }
228
229         @Override protected void onStateChanged() {
230             delegate.onStateChanged();
231         }
232
233         @Override
234         protected DataPersistenceProvider persistence() {
235             return this.dataPersistenceProvider;
236         }
237
238         @Override
239         protected Optional<ActorRef> getRoleChangeNotifier() {
240             return Optional.fromNullable(roleChangeNotifier);
241         }
242
243         @Override public String persistenceId() {
244             return this.getId();
245         }
246
247         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
248             Object obj = null;
249             ByteArrayInputStream bis = null;
250             ObjectInputStream ois = null;
251             try {
252                 bis = new ByteArrayInputStream(bs);
253                 ois = new ObjectInputStream(bis);
254                 obj = ois.readObject();
255             } finally {
256                 if (bis != null) {
257                     bis.close();
258                 }
259                 if (ois != null) {
260                     ois.close();
261                 }
262             }
263             return obj;
264         }
265
266         public ReplicatedLog getReplicatedLog(){
267             return this.getRaftActorContext().getReplicatedLog();
268         }
269
270     }
271
272
273     private static class RaftActorTestKit extends JavaTestKit {
274         private final ActorRef raftActor;
275
276         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
277             super(actorSystem);
278
279             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
280                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
281
282         }
283
284
285         public ActorRef getRaftActor() {
286             return raftActor;
287         }
288
289         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
290             // Wait for a specific log message to show up
291             return
292                 new JavaTestKit.EventFilter<Boolean>(logEventClass
293                 ) {
294                     @Override
295                     protected Boolean run() {
296                         return true;
297                     }
298                 }.from(raftActor.path().toString())
299                     .message(message)
300                     .occurrences(1).exec();
301
302
303         }
304
305         protected void waitUntilLeader(){
306             waitUntilLeader(raftActor);
307         }
308
309         protected void waitUntilLeader(ActorRef actorRef) {
310             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
311             for(int i = 0; i < 20 * 5; i++) {
312                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
313                 try {
314                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
315                     if(resp.getLeaderActor() != null) {
316                         return;
317                     }
318                 } catch(TimeoutException e) {
319                 } catch(Exception e) {
320                     System.err.println("FindLeader threw ex");
321                     e.printStackTrace();
322                 }
323
324
325                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
326             }
327
328             Assert.fail("Leader not found for actorRef " + actorRef.path());
329         }
330
331     }
332
333
334     @Test
335     public void testConstruction() {
336         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
337     }
338
339     @Test
340     public void testFindLeaderWhenLeaderIsSelf(){
341         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
342         kit.waitUntilLeader();
343     }
344
345     @Test
346     public void testRaftActorRecovery() throws Exception {
347         new JavaTestKit(getSystem()) {{
348             String persistenceId = factory.generateActorId("follower-");
349
350             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
351             // Set the heartbeat interval high to essentially disable election otherwise the test
352             // may fail if the actor is switched to Leader and the commitIndex is set to the last
353             // log entry.
354             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
355
356             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
357                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
358
359             watch(followerActor);
360
361             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
362             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
363                     new MockRaftActorContext.MockPayload("E"));
364             snapshotUnappliedEntries.add(entry1);
365
366             int lastAppliedDuringSnapshotCapture = 3;
367             int lastIndexDuringSnapshotCapture = 4;
368
369             // 4 messages as part of snapshot, which are applied to state
370             ByteString snapshotBytes = fromObject(Arrays.asList(
371                     new MockRaftActorContext.MockPayload("A"),
372                     new MockRaftActorContext.MockPayload("B"),
373                     new MockRaftActorContext.MockPayload("C"),
374                     new MockRaftActorContext.MockPayload("D")));
375
376             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
377                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
378                     lastAppliedDuringSnapshotCapture, 1);
379             MockSnapshotStore.setMockSnapshot(snapshot);
380             MockSnapshotStore.setPersistenceId(persistenceId);
381
382             // add more entries after snapshot is taken
383             List<ReplicatedLogEntry> entries = new ArrayList<>();
384             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
385                     new MockRaftActorContext.MockPayload("F"));
386             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
387                     new MockRaftActorContext.MockPayload("G"));
388             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
389                     new MockRaftActorContext.MockPayload("H"));
390             entries.add(entry2);
391             entries.add(entry3);
392             entries.add(entry4);
393
394             int lastAppliedToState = 5;
395             int lastIndex = 7;
396
397             MockAkkaJournal.addToJournal(5, entry2);
398             // 2 entries are applied to state besides the 4 entries in snapshot
399             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
400             MockAkkaJournal.addToJournal(7, entry3);
401             MockAkkaJournal.addToJournal(8, entry4);
402
403             // kill the actor
404             followerActor.tell(PoisonPill.getInstance(), null);
405             expectMsgClass(duration("5 seconds"), Terminated.class);
406
407             unwatch(followerActor);
408
409             //reinstate the actor
410             TestActorRef<MockRaftActor> ref = factory.createTestActor(
411                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
412                             Optional.<ConfigParams>of(config)));
413
414             ref.underlyingActor().waitForRecoveryComplete();
415
416             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
417             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
418                     context.getReplicatedLog().size());
419             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
420             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
421             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
422             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
423         }};
424     }
425
426     /**
427      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
428      * process recovery messages
429      *
430      * @throws Exception
431      */
432
433     @Test
434     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
435         new JavaTestKit(getSystem()) {
436             {
437                 String persistenceId = factory.generateActorId("leader-");
438
439                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
440
441                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
442
443                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
444                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
445
446                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
447
448                 // Wait for akka's recovery to complete so it doesn't interfere.
449                 mockRaftActor.waitForRecoveryComplete();
450
451                 ByteString snapshotBytes = fromObject(Arrays.asList(
452                         new MockRaftActorContext.MockPayload("A"),
453                         new MockRaftActorContext.MockPayload("B"),
454                         new MockRaftActorContext.MockPayload("C"),
455                         new MockRaftActorContext.MockPayload("D")));
456
457                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
458                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
459
460                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
461
462                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
463
464                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
465
466                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
467
468                 assertEquals("add replicated log entry", 1, replicatedLog.size());
469
470                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
471
472                 assertEquals("add replicated log entry", 2, replicatedLog.size());
473
474                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
475
476                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
477
478                 // The snapshot had 4 items + we added 2 more items during the test
479                 // We start removing from 5 and we should get 1 item in the replicated log
480                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
481
482                 assertEquals("remove log entries", 1, replicatedLog.size());
483
484                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
485
486                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
487                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
488
489                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
490
491             }};
492     }
493
494     /**
495      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
496      * not process recovery messages
497      *
498      * @throws Exception
499      */
500     @Test
501     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
502         new JavaTestKit(getSystem()) {
503             {
504                 String persistenceId = factory.generateActorId("leader-");
505
506                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
507
508                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
509
510                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
511                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
512
513                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
514
515                 // Wait for akka's recovery to complete so it doesn't interfere.
516                 mockRaftActor.waitForRecoveryComplete();
517
518                 ByteString snapshotBytes = fromObject(Arrays.asList(
519                         new MockRaftActorContext.MockPayload("A"),
520                         new MockRaftActorContext.MockPayload("B"),
521                         new MockRaftActorContext.MockPayload("C"),
522                         new MockRaftActorContext.MockPayload("D")));
523
524                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
525                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
526
527                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
528
529                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
530
531                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
532
533                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
534
535                 assertEquals("add replicated log entry", 0, replicatedLog.size());
536
537                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
538
539                 assertEquals("add replicated log entry", 0, replicatedLog.size());
540
541                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
542
543                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
544
545                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
546
547                 assertEquals("remove log entries", 0, replicatedLog.size());
548
549                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
550
551                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
552                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
553
554                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
555
556             }};
557     }
558
559
560     @Test
561     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
562         new JavaTestKit(getSystem()) {
563             {
564                 String persistenceId = factory.generateActorId("leader-");
565
566                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
567
568                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
569
570                 CountDownLatch persistLatch = new CountDownLatch(1);
571                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
572                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
573
574                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
575                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
576
577                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
578
579                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
580
581                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
582
583             }
584
585         };
586     }
587
588     @Test
589     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
590         new JavaTestKit(getSystem()) {
591             {
592                 String persistenceId = factory.generateActorId("leader-");
593
594                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
595
596                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
597
598                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
599
600                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
601                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
602
603                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
604
605                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
606
607                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
608
609                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
610
611             }
612
613         };
614     }
615
616     @Test
617     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
618         new JavaTestKit(getSystem()) {
619             {
620                 String persistenceId = factory.generateActorId("leader-");
621
622                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
623
624                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
625
626                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
627
628                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
629                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
630
631                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
632
633                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
634
635                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
636
637                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
638
639             }
640
641         };
642     }
643
644     @Test
645     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
646         new JavaTestKit(getSystem()) {
647             {
648                 String persistenceId = factory.generateActorId("leader-");
649
650                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
651
652                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
653
654                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
655
656                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
657                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
658
659                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
660
661                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
662
663                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
664
665             }
666
667         };
668     }
669
670     @Test
671     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
672         new JavaTestKit(getSystem()) {
673             {
674                 String persistenceId = factory.generateActorId("leader-");
675
676                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
677
678                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679
680                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
681
682                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
683                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
684                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
685
686                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
687
688                 ByteString snapshotBytes = fromObject(Arrays.asList(
689                         new MockRaftActorContext.MockPayload("A"),
690                         new MockRaftActorContext.MockPayload("B"),
691                         new MockRaftActorContext.MockPayload("C"),
692                         new MockRaftActorContext.MockPayload("D")));
693
694                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
695
696                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
697
698                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
699
700                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
701
702                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
703
704             }
705         };
706     }
707
708     @Test
709     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
710         new JavaTestKit(getSystem()) {
711             {
712                 String persistenceId = factory.generateActorId("leader-");
713
714                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
715
716                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
717
718                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
719
720                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
721                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
722
723                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
724
725                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
726                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
727                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
728                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
729                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
730
731                 ByteString snapshotBytes = fromObject(Arrays.asList(
732                         new MockRaftActorContext.MockPayload("A"),
733                         new MockRaftActorContext.MockPayload("B"),
734                         new MockRaftActorContext.MockPayload("C"),
735                         new MockRaftActorContext.MockPayload("D")));
736
737                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
738                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
739
740                 long replicatedToAllIndex = 1;
741                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
742
743                 verify(mockRaftActor.delegate).createSnapshot();
744
745                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
746
747                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
748
749                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
750
751                 verify(dataPersistenceProvider).deleteMessages(100);
752
753                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
754                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
755
756                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
757                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
758                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
759
760                 // Index 2 will not be in the log because it was removed due to snapshotting
761                 assertNull(mockRaftActor.getReplicatedLog().get(1));
762                 assertNull(mockRaftActor.getReplicatedLog().get(0));
763
764             }
765         };
766     }
767
768     @Test
769     public void testApplyState() throws Exception {
770
771         new JavaTestKit(getSystem()) {
772             {
773                 String persistenceId = factory.generateActorId("leader-");
774
775                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
776
777                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
778
779                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
780
781                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
782                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
783
784                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
785
786                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
787                         new MockRaftActorContext.MockPayload("F"));
788
789                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
790
791                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
792
793             }
794         };
795     }
796
797     @Test
798     public void testApplySnapshot() throws Exception {
799         new JavaTestKit(getSystem()) {
800             {
801                 String persistenceId = factory.generateActorId("leader-");
802
803                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
804
805                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
806
807                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
808
809                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
810                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
811
812                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
813
814                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
815
816                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
817                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
818                 oldReplicatedLog.append(
819                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
820                                 mock(Payload.class)));
821
822                 ByteString snapshotBytes = fromObject(Arrays.asList(
823                         new MockRaftActorContext.MockPayload("A"),
824                         new MockRaftActorContext.MockPayload("B"),
825                         new MockRaftActorContext.MockPayload("C"),
826                         new MockRaftActorContext.MockPayload("D")));
827
828                 Snapshot snapshot = mock(Snapshot.class);
829
830                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
831
832                 doReturn(3L).when(snapshot).getLastAppliedIndex();
833
834                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
835
836                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
837
838                 assertTrue("The replicatedLog should have changed",
839                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
840
841                 assertEquals("lastApplied should be same as in the snapshot",
842                         (Long) 3L, mockRaftActor.getLastApplied());
843
844                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
845
846             }
847         };
848     }
849
850     @Test
851     public void testSaveSnapshotFailure() throws Exception {
852         new JavaTestKit(getSystem()) {
853             {
854                 String persistenceId = factory.generateActorId("leader-");
855
856                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
857
858                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
859
860                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
861
862                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
863                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
864
865                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
866
867                 ByteString snapshotBytes = fromObject(Arrays.asList(
868                         new MockRaftActorContext.MockPayload("A"),
869                         new MockRaftActorContext.MockPayload("B"),
870                         new MockRaftActorContext.MockPayload("C"),
871                         new MockRaftActorContext.MockPayload("D")));
872
873                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
874
875                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
876
877                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
878
879                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
880
881                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
882                         new Exception()));
883
884                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
885                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
886
887             }
888         };
889     }
890
891     @Test
892     public void testRaftRoleChangeNotifier() throws Exception {
893         new JavaTestKit(getSystem()) {{
894             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
895             MessageCollectorActor.waitUntilReady(notifierActor);
896
897             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
898             long heartBeatInterval = 100;
899             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
900             config.setElectionTimeoutFactor(1);
901
902             String persistenceId = factory.generateActorId("notifier-");
903
904             factory.createTestActor(MockRaftActor.props(persistenceId,
905                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
906
907             List<RoleChanged> matches =  null;
908             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
909                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
910                 assertNotNull(matches);
911                 if(matches.size() == 3) {
912                     break;
913                 }
914                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
915             }
916
917             assertEquals(3, matches.size());
918
919             // check if the notifier got a role change from null to Follower
920             RoleChanged raftRoleChanged = matches.get(0);
921             assertEquals(persistenceId, raftRoleChanged.getMemberId());
922             assertNull(raftRoleChanged.getOldRole());
923             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
924
925             // check if the notifier got a role change from Follower to Candidate
926             raftRoleChanged = matches.get(1);
927             assertEquals(persistenceId, raftRoleChanged.getMemberId());
928             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
929             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
930
931             // check if the notifier got a role change from Candidate to Leader
932             raftRoleChanged = matches.get(2);
933             assertEquals(persistenceId, raftRoleChanged.getMemberId());
934             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
935             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
936         }};
937     }
938
939     @Test
940     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
941         new JavaTestKit(getSystem()) {
942             {
943                 String persistenceId = factory.generateActorId("leader-");
944                 String follower1Id = factory.generateActorId("follower-");
945
946                 ActorRef followerActor1 =
947                         factory.createActor(Props.create(MessageCollectorActor.class));
948
949                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
950                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
951                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
952
953                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
954
955                 Map<String, String> peerAddresses = new HashMap<>();
956                 peerAddresses.put(follower1Id, followerActor1.path().toString());
957
958                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
959                         MockRaftActor.props(persistenceId, peerAddresses,
960                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
961
962                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
963                 leaderActor.getRaftActorContext().setCommitIndex(4);
964                 leaderActor.getRaftActorContext().setLastApplied(4);
965                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
966
967                 leaderActor.waitForInitializeBehaviorComplete();
968
969                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
970
971                 Leader leader = new Leader(leaderActor.getRaftActorContext());
972                 leaderActor.setCurrentBehavior(leader);
973                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
974
975                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
976                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
977
978                 assertEquals(8, leaderActor.getReplicatedLog().size());
979
980                 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
981
982                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
983                 verify(leaderActor.delegate).createSnapshot();
984
985                 assertEquals(8, leaderActor.getReplicatedLog().size());
986
987                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
988                 //fake snapshot on index 5
989                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
990
991                 assertEquals(8, leaderActor.getReplicatedLog().size());
992
993                 //fake snapshot on index 6
994                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
995                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
996                 assertEquals(8, leaderActor.getReplicatedLog().size());
997
998                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
999
1000                 assertEquals(8, leaderActor.getReplicatedLog().size());
1001
1002                 ByteString snapshotBytes = fromObject(Arrays.asList(
1003                         new MockRaftActorContext.MockPayload("foo-0"),
1004                         new MockRaftActorContext.MockPayload("foo-1"),
1005                         new MockRaftActorContext.MockPayload("foo-2"),
1006                         new MockRaftActorContext.MockPayload("foo-3"),
1007                         new MockRaftActorContext.MockPayload("foo-4")));
1008                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1009                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1010
1011                 // capture snapshot reply should remove the snapshotted entries only
1012                 assertEquals(3, leaderActor.getReplicatedLog().size());
1013                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1014
1015                 // add another non-replicated entry
1016                 leaderActor.getReplicatedLog().append(
1017                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1018
1019                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1020                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1021                 assertEquals(2, leaderActor.getReplicatedLog().size());
1022                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1023
1024             }
1025         };
1026     }
1027
1028     @Test
1029     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1030         new JavaTestKit(getSystem()) {
1031             {
1032                 String persistenceId = factory.generateActorId("follower-");
1033                 String leaderId = factory.generateActorId("leader-");
1034
1035
1036                 ActorRef leaderActor1 =
1037                         factory.createActor(Props.create(MessageCollectorActor.class));
1038
1039                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1040                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1041                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1042
1043                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1044
1045                 Map<String, String> peerAddresses = new HashMap<>();
1046                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1047
1048                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1049                         MockRaftActor.props(persistenceId, peerAddresses,
1050                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1051
1052                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1053                 followerActor.getRaftActorContext().setCommitIndex(4);
1054                 followerActor.getRaftActorContext().setLastApplied(4);
1055                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1056
1057                 followerActor.waitForInitializeBehaviorComplete();
1058
1059
1060                 Follower follower = new Follower(followerActor.getRaftActorContext());
1061                 followerActor.setCurrentBehavior(follower);
1062                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1063
1064                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1065                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1066                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1067
1068                 // log has indices 0-5
1069                 assertEquals(6, followerActor.getReplicatedLog().size());
1070
1071                 //snapshot on 4
1072                 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1073
1074                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1075                 verify(followerActor.delegate).createSnapshot();
1076
1077                 assertEquals(6, followerActor.getReplicatedLog().size());
1078
1079                 //fake snapshot on index 6
1080                 List<ReplicatedLogEntry> entries =
1081                         Arrays.asList(
1082                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1083                                         new MockRaftActorContext.MockPayload("foo-6"))
1084                         );
1085                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1086                 assertEquals(7, followerActor.getReplicatedLog().size());
1087
1088                 //fake snapshot on index 7
1089                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1090
1091                 entries =
1092                         Arrays.asList(
1093                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1094                                         new MockRaftActorContext.MockPayload("foo-7"))
1095                         );
1096                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1097                 assertEquals(8, followerActor.getReplicatedLog().size());
1098
1099                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1100
1101
1102                 ByteString snapshotBytes = fromObject(Arrays.asList(
1103                         new MockRaftActorContext.MockPayload("foo-0"),
1104                         new MockRaftActorContext.MockPayload("foo-1"),
1105                         new MockRaftActorContext.MockPayload("foo-2"),
1106                         new MockRaftActorContext.MockPayload("foo-3"),
1107                         new MockRaftActorContext.MockPayload("foo-4")));
1108                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1109                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1110
1111                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1112                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1113                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1114
1115                 entries =
1116                         Arrays.asList(
1117                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1118                                         new MockRaftActorContext.MockPayload("foo-7"))
1119                         );
1120                 // send an additional entry 8 with leaderCommit = 7
1121                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1122
1123                 // 7 and 8, as lastapplied is 7
1124                 assertEquals(2, followerActor.getReplicatedLog().size());
1125
1126             }
1127         };
1128     }
1129
1130     @Test
1131     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1132         new JavaTestKit(getSystem()) {
1133             {
1134                 String persistenceId = factory.generateActorId("leader-");
1135                 String follower1Id = factory.generateActorId("follower-");
1136                 String follower2Id = factory.generateActorId("follower-");
1137
1138                 ActorRef followerActor1 =
1139                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1140                 ActorRef followerActor2 =
1141                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1142
1143                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1144                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1145                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1146
1147                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1148
1149                 Map<String, String> peerAddresses = new HashMap<>();
1150                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1151                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1152
1153                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1154                         MockRaftActor.props(persistenceId, peerAddresses,
1155                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1156
1157                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1158                 leaderActor.getRaftActorContext().setCommitIndex(9);
1159                 leaderActor.getRaftActorContext().setLastApplied(9);
1160                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1161
1162                 leaderActor.waitForInitializeBehaviorComplete();
1163
1164                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1165                 leaderActor.setCurrentBehavior(leader);
1166                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1167
1168                 // create 5 entries in the log
1169                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1170                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1171
1172                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1173                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1174                 //setting replicatedToAllIndex = 9, for the log to clear
1175                 leader.setReplicatedToAllIndex(9);
1176                 assertEquals(5, leaderActor.getReplicatedLog().size());
1177                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1178
1179                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1180                 assertEquals(5, leaderActor.getReplicatedLog().size());
1181                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1182
1183                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1184                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1185                 assertEquals(5, leaderActor.getReplicatedLog().size());
1186                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1187
1188                 // simulate a real snapshot
1189                 leaderActor.onReceiveCommand(new SendHeartBeat());
1190                 assertEquals(5, leaderActor.getReplicatedLog().size());
1191                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1192                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1193                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1194
1195
1196                 //reply from a slow follower does not initiate a fake snapshot
1197                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1198                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1199
1200                 ByteString snapshotBytes = fromObject(Arrays.asList(
1201                         new MockRaftActorContext.MockPayload("foo-0"),
1202                         new MockRaftActorContext.MockPayload("foo-1"),
1203                         new MockRaftActorContext.MockPayload("foo-2"),
1204                         new MockRaftActorContext.MockPayload("foo-3"),
1205                         new MockRaftActorContext.MockPayload("foo-4")));
1206                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1207                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1208
1209                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1210
1211                 //reply from a slow follower after should not raise errors
1212                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1213                 assertEquals(0, leaderActor.getReplicatedLog().size());
1214             }
1215         };
1216     }
1217
1218     private ByteString fromObject(Object snapshot) throws Exception {
1219         ByteArrayOutputStream b = null;
1220         ObjectOutputStream o = null;
1221         try {
1222             b = new ByteArrayOutputStream();
1223             o = new ObjectOutputStream(b);
1224             o.writeObject(snapshot);
1225             byte[] snapshotBytes = b.toByteArray();
1226             return ByteString.copyFrom(snapshotBytes);
1227         } finally {
1228             if (o != null) {
1229                 o.flush();
1230                 o.close();
1231             }
1232             if (b != null) {
1233                 b.close();
1234             }
1235         }
1236     }
1237
1238 }