Merge "Do not duplicate OSGi dependencyManagement"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Test;
54 import org.opendaylight.controller.cluster.DataPersistenceProvider;
55 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
56 import org.opendaylight.controller.cluster.notifications.RoleChanged;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
63 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
64 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
65 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
66 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
67 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
69 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
72 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
73 import scala.concurrent.Await;
74 import scala.concurrent.Future;
75 import scala.concurrent.duration.Duration;
76 import scala.concurrent.duration.FiniteDuration;
77
78 public class RaftActorTest extends AbstractActorTest {
79
80
81     @After
82     public void tearDown() {
83         MockAkkaJournal.clearJournal();
84         MockSnapshotStore.setMockSnapshot(null);
85     }
86
87     public static class MockRaftActor extends RaftActor {
88
89         private final DataPersistenceProvider dataPersistenceProvider;
90         private final RaftActor delegate;
91         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
92         private final List<Object> state;
93         private ActorRef roleChangeNotifier;
94         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
95
96         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
97             private static final long serialVersionUID = 1L;
98             private final Map<String, String> peerAddresses;
99             private final String id;
100             private final Optional<ConfigParams> config;
101             private final DataPersistenceProvider dataPersistenceProvider;
102             private final ActorRef roleChangeNotifier;
103
104             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
105                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
106                 ActorRef roleChangeNotifier) {
107                 this.peerAddresses = peerAddresses;
108                 this.id = id;
109                 this.config = config;
110                 this.dataPersistenceProvider = dataPersistenceProvider;
111                 this.roleChangeNotifier = roleChangeNotifier;
112             }
113
114             @Override
115             public MockRaftActor create() throws Exception {
116                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
117                     dataPersistenceProvider);
118                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
119                 return mockRaftActor;
120             }
121         }
122
123         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
124                              DataPersistenceProvider dataPersistenceProvider) {
125             super(id, peerAddresses, config);
126             state = new ArrayList<>();
127             this.delegate = mock(RaftActor.class);
128             if(dataPersistenceProvider == null){
129                 this.dataPersistenceProvider = new PersistentDataProvider();
130             } else {
131                 this.dataPersistenceProvider = dataPersistenceProvider;
132             }
133         }
134
135         public void waitForRecoveryComplete() {
136             try {
137                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
138             } catch (InterruptedException e) {
139                 e.printStackTrace();
140             }
141         }
142
143         public void waitForInitializeBehaviorComplete() {
144             try {
145                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
146             } catch (InterruptedException e) {
147                 e.printStackTrace();
148             }
149         }
150
151         public List<Object> getState() {
152             return state;
153         }
154
155         public static Props props(final String id, final Map<String, String> peerAddresses,
156                 Optional<ConfigParams> config){
157             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
158         }
159
160         public static Props props(final String id, final Map<String, String> peerAddresses,
161                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
162             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
163         }
164
165         public static Props props(final String id, final Map<String, String> peerAddresses,
166             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
167             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
168         }
169
170         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
171             delegate.applyState(clientActor, identifier, data);
172             LOG.info("applyState called");
173         }
174
175         @Override
176         protected void startLogRecoveryBatch(int maxBatchSize) {
177         }
178
179         @Override
180         protected void appendRecoveredLogEntry(Payload data) {
181             state.add(data);
182         }
183
184         @Override
185         protected void applyCurrentLogRecoveryBatch() {
186         }
187
188         @Override
189         protected void onRecoveryComplete() {
190             delegate.onRecoveryComplete();
191             recoveryComplete.countDown();
192         }
193
194         @Override
195         protected void initializeBehavior() {
196             super.initializeBehavior();
197             initializeBehaviorComplete.countDown();
198         }
199
200         @Override
201         protected void applyRecoverySnapshot(byte[] bytes) {
202             delegate.applyRecoverySnapshot(bytes);
203             try {
204                 Object data = toObject(bytes);
205                 if (data instanceof List) {
206                     state.addAll((List<?>) data);
207                 }
208             } catch (Exception e) {
209                 e.printStackTrace();
210             }
211         }
212
213         @Override protected void createSnapshot() {
214             delegate.createSnapshot();
215         }
216
217         @Override protected void applySnapshot(byte [] snapshot) {
218             delegate.applySnapshot(snapshot);
219         }
220
221         @Override protected void onStateChanged() {
222             delegate.onStateChanged();
223         }
224
225         @Override
226         protected DataPersistenceProvider persistence() {
227             return this.dataPersistenceProvider;
228         }
229
230         @Override
231         protected Optional<ActorRef> getRoleChangeNotifier() {
232             return Optional.fromNullable(roleChangeNotifier);
233         }
234
235         @Override public String persistenceId() {
236             return this.getId();
237         }
238
239         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
240             Object obj = null;
241             ByteArrayInputStream bis = null;
242             ObjectInputStream ois = null;
243             try {
244                 bis = new ByteArrayInputStream(bs);
245                 ois = new ObjectInputStream(bis);
246                 obj = ois.readObject();
247             } finally {
248                 if (bis != null) {
249                     bis.close();
250                 }
251                 if (ois != null) {
252                     ois.close();
253                 }
254             }
255             return obj;
256         }
257
258         public ReplicatedLog getReplicatedLog(){
259             return this.getRaftActorContext().getReplicatedLog();
260         }
261
262     }
263
264
265     private static class RaftActorTestKit extends JavaTestKit {
266         private final ActorRef raftActor;
267
268         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
269             super(actorSystem);
270
271             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
272                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
273
274         }
275
276
277         public ActorRef getRaftActor() {
278             return raftActor;
279         }
280
281         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
282             // Wait for a specific log message to show up
283             return
284                 new JavaTestKit.EventFilter<Boolean>(logEventClass
285                 ) {
286                     @Override
287                     protected Boolean run() {
288                         return true;
289                     }
290                 }.from(raftActor.path().toString())
291                     .message(message)
292                     .occurrences(1).exec();
293
294
295         }
296
297         protected void waitUntilLeader(){
298             waitUntilLeader(raftActor);
299         }
300
301         protected void waitUntilLeader(ActorRef actorRef) {
302             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
303             for(int i = 0; i < 20 * 5; i++) {
304                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
305                 try {
306                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
307                     if(resp.getLeaderActor() != null) {
308                         return;
309                     }
310                 } catch(TimeoutException e) {
311                 } catch(Exception e) {
312                     System.err.println("FindLeader threw ex");
313                     e.printStackTrace();
314                 }
315
316
317                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
318             }
319
320             Assert.fail("Leader not found for actorRef " + actorRef.path());
321         }
322
323     }
324
325
326     @Test
327     public void testConstruction() {
328         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
329     }
330
331     @Test
332     public void testFindLeaderWhenLeaderIsSelf(){
333         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
334         kit.waitUntilLeader();
335     }
336
337     @Test
338     public void testRaftActorRecovery() throws Exception {
339         new JavaTestKit(getSystem()) {{
340             String persistenceId = "follower10";
341
342             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
343             // Set the heartbeat interval high to essentially disable election otherwise the test
344             // may fail if the actor is switched to Leader and the commitIndex is set to the last
345             // log entry.
346             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
347
348             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
349                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
350
351             watch(followerActor);
352
353             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
354             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
355                     new MockRaftActorContext.MockPayload("E"));
356             snapshotUnappliedEntries.add(entry1);
357
358             int lastAppliedDuringSnapshotCapture = 3;
359             int lastIndexDuringSnapshotCapture = 4;
360
361                 // 4 messages as part of snapshot, which are applied to state
362             ByteString snapshotBytes  = fromObject(Arrays.asList(
363                     new MockRaftActorContext.MockPayload("A"),
364                     new MockRaftActorContext.MockPayload("B"),
365                     new MockRaftActorContext.MockPayload("C"),
366                     new MockRaftActorContext.MockPayload("D")));
367
368             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
369                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
370                     lastAppliedDuringSnapshotCapture, 1);
371             MockSnapshotStore.setMockSnapshot(snapshot);
372             MockSnapshotStore.setPersistenceId(persistenceId);
373
374             // add more entries after snapshot is taken
375             List<ReplicatedLogEntry> entries = new ArrayList<>();
376             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
377                     new MockRaftActorContext.MockPayload("F"));
378             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
379                     new MockRaftActorContext.MockPayload("G"));
380             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
381                     new MockRaftActorContext.MockPayload("H"));
382             entries.add(entry2);
383             entries.add(entry3);
384             entries.add(entry4);
385
386             int lastAppliedToState = 5;
387             int lastIndex = 7;
388
389             MockAkkaJournal.addToJournal(5, entry2);
390             // 2 entries are applied to state besides the 4 entries in snapshot
391             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
392             MockAkkaJournal.addToJournal(7, entry3);
393             MockAkkaJournal.addToJournal(8, entry4);
394
395             // kill the actor
396             followerActor.tell(PoisonPill.getInstance(), null);
397             expectMsgClass(duration("5 seconds"), Terminated.class);
398
399             unwatch(followerActor);
400
401             //reinstate the actor
402             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
403                     MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
404                             Optional.<ConfigParams>of(config)));
405
406             ref.underlyingActor().waitForRecoveryComplete();
407
408             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
409             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
410                     context.getReplicatedLog().size());
411             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
412             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
413             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
414             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
415         }};
416     }
417
418     /**
419      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
420      * process recovery messages
421      *
422      * @throws Exception
423      */
424
425     @Test
426     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
427         new JavaTestKit(getSystem()) {
428             {
429                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
430
431                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
432
433                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
434
435                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
436                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
437
438                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
439
440                 // Wait for akka's recovery to complete so it doesn't interfere.
441                 mockRaftActor.waitForRecoveryComplete();
442
443                 ByteString snapshotBytes  = fromObject(Arrays.asList(
444                         new MockRaftActorContext.MockPayload("A"),
445                         new MockRaftActorContext.MockPayload("B"),
446                         new MockRaftActorContext.MockPayload("C"),
447                         new MockRaftActorContext.MockPayload("D")));
448
449                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
450                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
451
452                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
453
454                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
455
456                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
457
458                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
459
460                 assertEquals("add replicated log entry", 1, replicatedLog.size());
461
462                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
463
464                 assertEquals("add replicated log entry", 2, replicatedLog.size());
465
466                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
467
468                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
469
470                 // The snapshot had 4 items + we added 2 more items during the test
471                 // We start removing from 5 and we should get 1 item in the replicated log
472                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
473
474                 assertEquals("remove log entries", 1, replicatedLog.size());
475
476                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
477
478                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
479                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
480
481                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
482
483                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
484
485             }};
486     }
487
488     /**
489      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
490      * not process recovery messages
491      *
492      * @throws Exception
493      */
494     @Test
495     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
496         new JavaTestKit(getSystem()) {
497             {
498                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
499
500                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
501
502                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
503
504                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
505                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
506
507                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
508
509                 // Wait for akka's recovery to complete so it doesn't interfere.
510                 mockRaftActor.waitForRecoveryComplete();
511
512                 ByteString snapshotBytes  = fromObject(Arrays.asList(
513                         new MockRaftActorContext.MockPayload("A"),
514                         new MockRaftActorContext.MockPayload("B"),
515                         new MockRaftActorContext.MockPayload("C"),
516                         new MockRaftActorContext.MockPayload("D")));
517
518                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
519                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
520
521                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
522
523                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
524
525                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
526
527                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
528
529                 assertEquals("add replicated log entry", 0, replicatedLog.size());
530
531                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
532
533                 assertEquals("add replicated log entry", 0, replicatedLog.size());
534
535                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
536
537                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
538
539                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
540
541                 assertEquals("remove log entries", 0, replicatedLog.size());
542
543                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
544
545                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
546                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
547
548                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
549
550                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
551             }};
552     }
553
554
555     @Test
556     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
557         new JavaTestKit(getSystem()) {
558             {
559                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
560
561                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
562
563                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
564
565                 CountDownLatch persistLatch = new CountDownLatch(1);
566                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
567                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
568
569                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
570                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
571
572                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
573
574                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
575
576                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
577
578                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
579
580             }
581         };
582     }
583
584     @Test
585     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
586         new JavaTestKit(getSystem()) {
587             {
588                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
589
590                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
591
592                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
593
594                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
595
596                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
597                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
598
599                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
600
601                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
602
603                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
604
605                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
606
607                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
608
609             }
610         };
611     }
612
613     @Test
614     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
615         new JavaTestKit(getSystem()) {
616             {
617                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
618
619                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
620
621                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
622
623                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
624
625                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
626                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
627
628                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
629
630                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
631
632                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
633
634                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
635
636                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
637
638             }
639         };
640     }
641
642     @Test
643     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
644         new JavaTestKit(getSystem()) {
645             {
646                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
647
648                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
649
650                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
651
652                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
653
654                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
655                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
656
657                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
658
659                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
660
661                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
662
663                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
664
665             }
666         };
667     }
668
669     @Test
670     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
671         new JavaTestKit(getSystem()) {
672             {
673                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
674
675                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
676
677                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
678
679                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
680
681                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
682                     MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
683                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
684
685                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
686
687                 ByteString snapshotBytes  = fromObject(Arrays.asList(
688                         new MockRaftActorContext.MockPayload("A"),
689                         new MockRaftActorContext.MockPayload("B"),
690                         new MockRaftActorContext.MockPayload("C"),
691                         new MockRaftActorContext.MockPayload("D")));
692
693                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
694
695                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
696
697                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
698
699                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
700
701                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
702
703                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
704
705             }
706         };
707     }
708
709     @Test
710     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
711         new JavaTestKit(getSystem()) {
712             {
713                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
714
715                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
716
717                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
718
719                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
720
721                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
722                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
723
724                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
725
726                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
727                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
728                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
729                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
730                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
731
732                 ByteString snapshotBytes = fromObject(Arrays.asList(
733                         new MockRaftActorContext.MockPayload("A"),
734                         new MockRaftActorContext.MockPayload("B"),
735                         new MockRaftActorContext.MockPayload("C"),
736                         new MockRaftActorContext.MockPayload("D")));
737
738                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
739                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
740
741                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
742
743                 verify(mockRaftActor.delegate).createSnapshot();
744
745                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
746
747                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
748
749                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
750
751                 verify(dataPersistenceProvider).deleteMessages(100);
752
753                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
754
755                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
756                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
757
758                 // Index 2 will not be in the log because it was removed due to snapshotting
759                 assertNull(mockRaftActor.getReplicatedLog().get(2));
760
761                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
762
763             }
764         };
765     }
766
767     @Test
768     public void testApplyState() throws Exception {
769
770         new JavaTestKit(getSystem()) {
771             {
772                 String persistenceId = "testApplyState";
773
774                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
775
776                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
777
778                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
779
780                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
781                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
782
783                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
784
785                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
786                         new MockRaftActorContext.MockPayload("F"));
787
788                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
789
790                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
791
792                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
793
794             }
795         };
796     }
797
798     @Test
799     public void testApplySnapshot() throws Exception {
800         new JavaTestKit(getSystem()) {
801             {
802                 String persistenceId = "testApplySnapshot";
803
804                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
805
806                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
807
808                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
809
810                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
811                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
812
813                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
814
815                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
816
817                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
818                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
819                 oldReplicatedLog.append(
820                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
821                         mock(Payload.class)));
822
823                 ByteString snapshotBytes = fromObject(Arrays.asList(
824                     new MockRaftActorContext.MockPayload("A"),
825                     new MockRaftActorContext.MockPayload("B"),
826                     new MockRaftActorContext.MockPayload("C"),
827                     new MockRaftActorContext.MockPayload("D")));
828
829                 Snapshot snapshot = mock(Snapshot.class);
830
831                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
832
833                 doReturn(3L).when(snapshot).getLastAppliedIndex();
834
835                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
836
837                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
838
839                 assertTrue("The replicatedLog should have changed",
840                     oldReplicatedLog != mockRaftActor.getReplicatedLog());
841
842                 assertEquals("lastApplied should be same as in the snapshot",
843                     (Long) 3L, mockRaftActor.getLastApplied());
844
845                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
846
847                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
848
849             }
850         };
851     }
852
853     @Test
854     public void testSaveSnapshotFailure() throws Exception {
855         new JavaTestKit(getSystem()) {
856             {
857                 String persistenceId = "testSaveSnapshotFailure";
858
859                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
860
861                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
862
863                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
864
865                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
866                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
867
868                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
869
870                 ByteString snapshotBytes  = fromObject(Arrays.asList(
871                         new MockRaftActorContext.MockPayload("A"),
872                         new MockRaftActorContext.MockPayload("B"),
873                         new MockRaftActorContext.MockPayload("C"),
874                         new MockRaftActorContext.MockPayload("D")));
875
876                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
877
878                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
879
880                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
881
882                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
883
884                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
885                         new Exception()));
886
887                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
888                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
889
890                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
891
892             }
893         };
894     }
895
896     @Test
897     public void testRaftRoleChangeNotifier() throws Exception {
898         new JavaTestKit(getSystem()) {{
899             ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
900             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
901             String id = "testRaftRoleChangeNotifier";
902
903             TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
904                 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
905
906             // sleeping for a minimum of 2 seconds, if it spans more its fine.
907             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
908
909             List<Object> matches =  MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
910             assertNotNull(matches);
911             assertEquals(3, matches.size());
912
913             // check if the notifier got a role change from null to Follower
914             RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
915             assertEquals(id, raftRoleChanged.getMemberId());
916             assertNull(raftRoleChanged.getOldRole());
917             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
918
919             // check if the notifier got a role change from Follower to Candidate
920             raftRoleChanged = (RoleChanged) matches.get(1);
921             assertEquals(id, raftRoleChanged.getMemberId());
922             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
923             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
924
925             // check if the notifier got a role change from Candidate to Leader
926             raftRoleChanged = (RoleChanged) matches.get(2);
927             assertEquals(id, raftRoleChanged.getMemberId());
928             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
929             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
930         }};
931     }
932
933     @Test
934     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
935         new JavaTestKit(getSystem()) {
936             {
937                 String persistenceId = "leader1";
938
939                 ActorRef followerActor1 =
940                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
941
942                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
943                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
944                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
945
946                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
947
948                 Map<String, String> peerAddresses = new HashMap<>();
949                 peerAddresses.put("follower-1", followerActor1.path().toString());
950
951                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
952                         MockRaftActor.props(persistenceId, peerAddresses,
953                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
954
955                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
956                 leaderActor.getRaftActorContext().setCommitIndex(4);
957                 leaderActor.getRaftActorContext().setLastApplied(4);
958                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
959
960                 leaderActor.waitForInitializeBehaviorComplete();
961
962                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
963
964                 Leader leader = new Leader(leaderActor.getRaftActorContext());
965                 leaderActor.setCurrentBehavior(leader);
966                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
967
968                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
969                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
970
971                 assertEquals(8, leaderActor.getReplicatedLog().size());
972
973                 leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
974                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
975                 verify(leaderActor.delegate).createSnapshot();
976
977                 assertEquals(8, leaderActor.getReplicatedLog().size());
978
979                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
980                 //fake snapshot on index 5
981                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
982
983                 assertEquals(8, leaderActor.getReplicatedLog().size());
984
985                 //fake snapshot on index 6
986                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
987                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
988                 assertEquals(8, leaderActor.getReplicatedLog().size());
989
990                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
991
992                 assertEquals(8, leaderActor.getReplicatedLog().size());
993
994                 ByteString snapshotBytes  = fromObject(Arrays.asList(
995                         new MockRaftActorContext.MockPayload("foo-0"),
996                         new MockRaftActorContext.MockPayload("foo-1"),
997                         new MockRaftActorContext.MockPayload("foo-2"),
998                         new MockRaftActorContext.MockPayload("foo-3"),
999                         new MockRaftActorContext.MockPayload("foo-4")));
1000                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1001                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1002
1003                 // capture snapshot reply should remove the snapshotted entries only
1004                 assertEquals(3, leaderActor.getReplicatedLog().size());
1005                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1006
1007                 // add another non-replicated entry
1008                 leaderActor.getReplicatedLog().append(
1009                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1010
1011                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1012                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
1013                 assertEquals(2, leaderActor.getReplicatedLog().size());
1014                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1015
1016                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1017
1018             }
1019         };
1020     }
1021
1022     @Test
1023     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1024         new JavaTestKit(getSystem()) {
1025             {
1026                 String persistenceId = "follower1";
1027
1028                 ActorRef leaderActor1 =
1029                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1030
1031                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1032                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1033                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1034
1035                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1036
1037                 Map<String, String> peerAddresses = new HashMap<>();
1038                 peerAddresses.put("leader", leaderActor1.path().toString());
1039
1040                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1041                         MockRaftActor.props(persistenceId, peerAddresses,
1042                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1043
1044                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1045                 followerActor.getRaftActorContext().setCommitIndex(4);
1046                 followerActor.getRaftActorContext().setLastApplied(4);
1047                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1048
1049                 followerActor.waitForInitializeBehaviorComplete();
1050
1051                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1052                 Follower follower = new Follower(followerActor.getRaftActorContext());
1053                 followerActor.setCurrentBehavior(follower);
1054                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1055
1056                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1057                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1058
1059                 // log as indices 0-5
1060                 assertEquals(6, followerActor.getReplicatedLog().size());
1061
1062                 //snapshot on 4
1063                 followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
1064                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1065                 verify(followerActor.delegate).createSnapshot();
1066
1067                 assertEquals(6, followerActor.getReplicatedLog().size());
1068
1069                 //fake snapshot on index 6
1070                 List<ReplicatedLogEntry> entries =
1071                         Arrays.asList(
1072                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1073                                         new MockRaftActorContext.MockPayload("foo-6"))
1074                         );
1075                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
1076                 assertEquals(7, followerActor.getReplicatedLog().size());
1077
1078                 //fake snapshot on index 7
1079                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1080
1081                 entries =
1082                         Arrays.asList(
1083                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1084                                         new MockRaftActorContext.MockPayload("foo-7"))
1085                         );
1086                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
1087                 assertEquals(8, followerActor.getReplicatedLog().size());
1088
1089                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1090
1091
1092                 ByteString snapshotBytes  = fromObject(Arrays.asList(
1093                         new MockRaftActorContext.MockPayload("foo-0"),
1094                         new MockRaftActorContext.MockPayload("foo-1"),
1095                         new MockRaftActorContext.MockPayload("foo-2"),
1096                         new MockRaftActorContext.MockPayload("foo-3"),
1097                         new MockRaftActorContext.MockPayload("foo-4")));
1098                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1099                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1100
1101                 // capture snapshot reply should remove the snapshotted entries only
1102                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1103                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1104
1105                 entries =
1106                         Arrays.asList(
1107                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1108                                         new MockRaftActorContext.MockPayload("foo-7"))
1109                         );
1110                 // send an additional entry 8 with leaderCommit = 7
1111                 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
1112
1113                 // 7 and 8, as lastapplied is 7
1114                 assertEquals(2, followerActor.getReplicatedLog().size());
1115
1116                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1117
1118             }
1119         };
1120     }
1121
1122     @Test
1123     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1124         new JavaTestKit(getSystem()) {
1125             {
1126                 String persistenceId = "leader1";
1127
1128                 ActorRef followerActor1 =
1129                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1130                 ActorRef followerActor2 =
1131                         getSystem().actorOf(Props.create(MessageCollectorActor.class));
1132
1133                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1134                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1135                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1136
1137                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1138
1139                 Map<String, String> peerAddresses = new HashMap<>();
1140                 peerAddresses.put("follower-1", followerActor1.path().toString());
1141                 peerAddresses.put("follower-2", followerActor2.path().toString());
1142
1143                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1144                         MockRaftActor.props(persistenceId, peerAddresses,
1145                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1146
1147                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1148                 leaderActor.getRaftActorContext().setCommitIndex(9);
1149                 leaderActor.getRaftActorContext().setLastApplied(9);
1150                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1151
1152                 leaderActor.waitForInitializeBehaviorComplete();
1153
1154                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1155                 leaderActor.setCurrentBehavior(leader);
1156                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1157
1158                 // create 5 entries in the log
1159                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1160                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1161                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1162                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1163                 assertEquals(5, leaderActor.getReplicatedLog().size());
1164
1165                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
1166                 assertEquals(5, leaderActor.getReplicatedLog().size());
1167
1168                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1169                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
1170                 assertEquals(5, leaderActor.getReplicatedLog().size());
1171
1172                 // simulate a real snapshot
1173                 leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
1174                 assertEquals(5, leaderActor.getReplicatedLog().size());
1175                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1176                         leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId())
1177                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1178
1179
1180                 //reply from a slow follower does not initiate a fake snapshot
1181                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
1182                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1183
1184                 ByteString snapshotBytes  = fromObject(Arrays.asList(
1185                         new MockRaftActorContext.MockPayload("foo-0"),
1186                         new MockRaftActorContext.MockPayload("foo-1"),
1187                         new MockRaftActorContext.MockPayload("foo-2"),
1188                         new MockRaftActorContext.MockPayload("foo-3"),
1189                         new MockRaftActorContext.MockPayload("foo-4")));
1190                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1191                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1192
1193                 assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
1194
1195                 //reply from a slow follower after should not raise errors
1196                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
1197                 assertEquals(0, leaderActor.getReplicatedLog().size());
1198
1199                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1200
1201             }
1202         };
1203     }
1204
1205
1206
1207     private ByteString fromObject(Object snapshot) throws Exception {
1208         ByteArrayOutputStream b = null;
1209         ObjectOutputStream o = null;
1210         try {
1211             b = new ByteArrayOutputStream();
1212             o = new ObjectOutputStream(b);
1213             o.writeObject(snapshot);
1214             byte[] snapshotBytes = b.toByteArray();
1215             return ByteString.copyFrom(snapshotBytes);
1216         } finally {
1217             if (o != null) {
1218                 o.flush();
1219                 o.close();
1220             }
1221             if (b != null) {
1222                 b.close();
1223             }
1224         }
1225     }
1226 }