Merge "BUG-2424 Bump mina sshd-core version to 0.14"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
58 import org.opendaylight.controller.cluster.notifications.RoleChanged;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
68 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
69 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
71 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
72 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
73 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
74 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
75 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
76 import scala.concurrent.Await;
77 import scala.concurrent.Future;
78 import scala.concurrent.duration.Duration;
79 import scala.concurrent.duration.FiniteDuration;
80
81 public class RaftActorTest extends AbstractActorTest {
82
83     private TestActorFactory factory;
84
85     @Before
86     public void setUp(){
87         factory = new TestActorFactory(getSystem());
88     }
89
90     @After
91     public void tearDown() throws Exception {
92         factory.close();
93         InMemoryJournal.clear();
94         InMemorySnapshotStore.clear();
95     }
96
97     public static class MockRaftActor extends RaftActor {
98
99         protected DataPersistenceProvider dataPersistenceProvider;
100         private final RaftActor delegate;
101         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
102         private final List<Object> state;
103         private ActorRef roleChangeNotifier;
104         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
105
106         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
107             private static final long serialVersionUID = 1L;
108             private final Map<String, String> peerAddresses;
109             private final String id;
110             private final Optional<ConfigParams> config;
111             private final DataPersistenceProvider dataPersistenceProvider;
112             private final ActorRef roleChangeNotifier;
113
114             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
115                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
116                 ActorRef roleChangeNotifier) {
117                 this.peerAddresses = peerAddresses;
118                 this.id = id;
119                 this.config = config;
120                 this.dataPersistenceProvider = dataPersistenceProvider;
121                 this.roleChangeNotifier = roleChangeNotifier;
122             }
123
124             @Override
125             public MockRaftActor create() throws Exception {
126                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
127                     dataPersistenceProvider);
128                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
129                 return mockRaftActor;
130             }
131         }
132
133         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
134                              DataPersistenceProvider dataPersistenceProvider) {
135             super(id, peerAddresses, config);
136             state = new ArrayList<>();
137             this.delegate = mock(RaftActor.class);
138             if(dataPersistenceProvider == null){
139                 this.dataPersistenceProvider = new PersistentDataProvider();
140             } else {
141                 this.dataPersistenceProvider = dataPersistenceProvider;
142             }
143         }
144
145         public void waitForRecoveryComplete() {
146             try {
147                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
148             } catch (InterruptedException e) {
149                 e.printStackTrace();
150             }
151         }
152
153         public void waitForInitializeBehaviorComplete() {
154             try {
155                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
156             } catch (InterruptedException e) {
157                 e.printStackTrace();
158             }
159         }
160
161         public List<Object> getState() {
162             return state;
163         }
164
165         public static Props props(final String id, final Map<String, String> peerAddresses,
166                 Optional<ConfigParams> config){
167             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
168         }
169
170         public static Props props(final String id, final Map<String, String> peerAddresses,
171                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
172             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
173         }
174
175         public static Props props(final String id, final Map<String, String> peerAddresses,
176             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
177             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
178         }
179
180         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
181             delegate.applyState(clientActor, identifier, data);
182             LOG.info("{}: applyState called", persistenceId());
183         }
184
185         @Override
186         protected void startLogRecoveryBatch(int maxBatchSize) {
187         }
188
189         @Override
190         protected void appendRecoveredLogEntry(Payload data) {
191             state.add(data);
192         }
193
194         @Override
195         protected void applyCurrentLogRecoveryBatch() {
196         }
197
198         @Override
199         protected void onRecoveryComplete() {
200             delegate.onRecoveryComplete();
201             recoveryComplete.countDown();
202         }
203
204         @Override
205         protected void initializeBehavior() {
206             super.initializeBehavior();
207             initializeBehaviorComplete.countDown();
208         }
209
210         @Override
211         protected void applyRecoverySnapshot(byte[] bytes) {
212             delegate.applyRecoverySnapshot(bytes);
213             try {
214                 Object data = toObject(bytes);
215                 if (data instanceof List) {
216                     state.addAll((List<?>) data);
217                 }
218             } catch (Exception e) {
219                 e.printStackTrace();
220             }
221         }
222
223         @Override protected void createSnapshot() {
224             LOG.info("{}: createSnapshot called", persistenceId());
225             delegate.createSnapshot();
226         }
227
228         @Override protected void applySnapshot(byte [] snapshot) {
229             LOG.info("{}: applySnapshot called", persistenceId());
230             delegate.applySnapshot(snapshot);
231         }
232
233         @Override protected void onStateChanged() {
234             delegate.onStateChanged();
235         }
236
237         @Override
238         protected DataPersistenceProvider persistence() {
239             return this.dataPersistenceProvider;
240         }
241
242         @Override
243         protected Optional<ActorRef> getRoleChangeNotifier() {
244             return Optional.fromNullable(roleChangeNotifier);
245         }
246
247         @Override public String persistenceId() {
248             return this.getId();
249         }
250
251         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
252             Object obj = null;
253             ByteArrayInputStream bis = null;
254             ObjectInputStream ois = null;
255             try {
256                 bis = new ByteArrayInputStream(bs);
257                 ois = new ObjectInputStream(bis);
258                 obj = ois.readObject();
259             } finally {
260                 if (bis != null) {
261                     bis.close();
262                 }
263                 if (ois != null) {
264                     ois.close();
265                 }
266             }
267             return obj;
268         }
269
270         public ReplicatedLog getReplicatedLog(){
271             return this.getRaftActorContext().getReplicatedLog();
272         }
273
274     }
275
276
277     public static class RaftActorTestKit extends JavaTestKit {
278         private final ActorRef raftActor;
279
280         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
281             super(actorSystem);
282
283             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
284                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
285
286         }
287
288
289         public ActorRef getRaftActor() {
290             return raftActor;
291         }
292
293         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
294             // Wait for a specific log message to show up
295             return
296                 new JavaTestKit.EventFilter<Boolean>(logEventClass
297                 ) {
298                     @Override
299                     protected Boolean run() {
300                         return true;
301                     }
302                 }.from(raftActor.path().toString())
303                     .message(message)
304                     .occurrences(1).exec();
305
306
307         }
308
309         protected void waitUntilLeader(){
310             waitUntilLeader(raftActor);
311         }
312
313         public static void waitUntilLeader(ActorRef actorRef) {
314             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
315             for(int i = 0; i < 20 * 5; i++) {
316                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
317                 try {
318                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
319                     if(resp.getLeaderActor() != null) {
320                         return;
321                     }
322                 } catch(TimeoutException e) {
323                 } catch(Exception e) {
324                     System.err.println("FindLeader threw ex");
325                     e.printStackTrace();
326                 }
327
328
329                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
330             }
331
332             Assert.fail("Leader not found for actorRef " + actorRef.path());
333         }
334
335     }
336
337
338     @Test
339     public void testConstruction() {
340         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
341     }
342
343     @Test
344     public void testFindLeaderWhenLeaderIsSelf(){
345         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
346         kit.waitUntilLeader();
347     }
348
349     @Test
350     public void testRaftActorRecovery() throws Exception {
351         new JavaTestKit(getSystem()) {{
352             String persistenceId = factory.generateActorId("follower-");
353
354             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
355             // Set the heartbeat interval high to essentially disable election otherwise the test
356             // may fail if the actor is switched to Leader and the commitIndex is set to the last
357             // log entry.
358             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
359
360             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
361                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
362
363             watch(followerActor);
364
365             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
366             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
367                     new MockRaftActorContext.MockPayload("E"));
368             snapshotUnappliedEntries.add(entry1);
369
370             int lastAppliedDuringSnapshotCapture = 3;
371             int lastIndexDuringSnapshotCapture = 4;
372
373             // 4 messages as part of snapshot, which are applied to state
374             ByteString snapshotBytes = fromObject(Arrays.asList(
375                     new MockRaftActorContext.MockPayload("A"),
376                     new MockRaftActorContext.MockPayload("B"),
377                     new MockRaftActorContext.MockPayload("C"),
378                     new MockRaftActorContext.MockPayload("D")));
379
380             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
381                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
382                     lastAppliedDuringSnapshotCapture, 1);
383             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
384
385             // add more entries after snapshot is taken
386             List<ReplicatedLogEntry> entries = new ArrayList<>();
387             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
388                     new MockRaftActorContext.MockPayload("F"));
389             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
390                     new MockRaftActorContext.MockPayload("G"));
391             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
392                     new MockRaftActorContext.MockPayload("H"));
393             entries.add(entry2);
394             entries.add(entry3);
395             entries.add(entry4);
396
397             int lastAppliedToState = 5;
398             int lastIndex = 7;
399
400             InMemoryJournal.addEntry(persistenceId, 5, entry2);
401             // 2 entries are applied to state besides the 4 entries in snapshot
402             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
403             InMemoryJournal.addEntry(persistenceId, 7, entry3);
404             InMemoryJournal.addEntry(persistenceId, 8, entry4);
405
406             // kill the actor
407             followerActor.tell(PoisonPill.getInstance(), null);
408             expectMsgClass(duration("5 seconds"), Terminated.class);
409
410             unwatch(followerActor);
411
412             //reinstate the actor
413             TestActorRef<MockRaftActor> ref = factory.createTestActor(
414                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
415                             Optional.<ConfigParams>of(config)));
416
417             ref.underlyingActor().waitForRecoveryComplete();
418
419             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
420             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
421                     context.getReplicatedLog().size());
422             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
423             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
424             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
425             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
426         }};
427     }
428
429     @Test
430     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
431         new JavaTestKit(getSystem()) {{
432             String persistenceId = factory.generateActorId("leader-");
433
434             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
435             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
436
437             // Setup the persisted journal with some entries
438             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
439                     new MockRaftActorContext.MockPayload("zero"));
440             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
441                     new MockRaftActorContext.MockPayload("oen"));
442             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
443                     new MockRaftActorContext.MockPayload("two"));
444
445             long seqNr = 1;
446             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
447             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
448             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
449             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
450
451             int lastAppliedToState = 1;
452             int lastIndex = 2;
453
454             //reinstate the actor
455             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
456                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
457                             Optional.<ConfigParams>of(config)));
458
459             leaderActor.underlyingActor().waitForRecoveryComplete();
460
461             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
462             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
463             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
464             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
465             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
466         }};
467     }
468
469     /**
470      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
471      * process recovery messages
472      *
473      * @throws Exception
474      */
475
476     @Test
477     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
478         new JavaTestKit(getSystem()) {
479             {
480                 String persistenceId = factory.generateActorId("leader-");
481
482                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
483
484                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
485
486                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
487                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
488
489                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
490
491                 // Wait for akka's recovery to complete so it doesn't interfere.
492                 mockRaftActor.waitForRecoveryComplete();
493
494                 ByteString snapshotBytes = fromObject(Arrays.asList(
495                         new MockRaftActorContext.MockPayload("A"),
496                         new MockRaftActorContext.MockPayload("B"),
497                         new MockRaftActorContext.MockPayload("C"),
498                         new MockRaftActorContext.MockPayload("D")));
499
500                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
501                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
502
503                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
504
505                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
506
507                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
508
509                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
510
511                 assertEquals("add replicated log entry", 1, replicatedLog.size());
512
513                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
514
515                 assertEquals("add replicated log entry", 2, replicatedLog.size());
516
517                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
518
519                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
520
521                 // The snapshot had 4 items + we added 2 more items during the test
522                 // We start removing from 5 and we should get 1 item in the replicated log
523                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
524
525                 assertEquals("remove log entries", 1, replicatedLog.size());
526
527                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
528
529                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
530                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
531
532                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
533
534             }};
535     }
536
537     /**
538      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
539      * not process recovery messages
540      *
541      * @throws Exception
542      */
543     @Test
544     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
545         new JavaTestKit(getSystem()) {
546             {
547                 String persistenceId = factory.generateActorId("leader-");
548
549                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
550
551                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
552
553                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
554                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
555
556                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
557
558                 // Wait for akka's recovery to complete so it doesn't interfere.
559                 mockRaftActor.waitForRecoveryComplete();
560
561                 ByteString snapshotBytes = fromObject(Arrays.asList(
562                         new MockRaftActorContext.MockPayload("A"),
563                         new MockRaftActorContext.MockPayload("B"),
564                         new MockRaftActorContext.MockPayload("C"),
565                         new MockRaftActorContext.MockPayload("D")));
566
567                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
568                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
569
570                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
571
572                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
573
574                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
575
576                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
577
578                 assertEquals("add replicated log entry", 0, replicatedLog.size());
579
580                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
581
582                 assertEquals("add replicated log entry", 0, replicatedLog.size());
583
584                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
585
586                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
587
588                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
589
590                 assertEquals("remove log entries", 0, replicatedLog.size());
591
592                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
593
594                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
595                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
596
597                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
598             }};
599     }
600
601
602     @Test
603     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
604         new JavaTestKit(getSystem()) {
605             {
606                 String persistenceId = factory.generateActorId("leader-");
607
608                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
609
610                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
611
612                 CountDownLatch persistLatch = new CountDownLatch(1);
613                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
614                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
615
616                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
617                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
618
619                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
620
621                 mockRaftActor.waitForInitializeBehaviorComplete();
622
623                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
624
625                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
626             }
627         };
628     }
629
630     @Test
631     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
632         new JavaTestKit(getSystem()) {
633             {
634                 String persistenceId = factory.generateActorId("leader-");
635
636                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
637
638                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
639
640                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
641
642                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
643                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
644
645                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
646
647                 mockRaftActor.waitForInitializeBehaviorComplete();
648
649                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
650
651                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
652
653                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
654             }
655         };
656     }
657
658     @Test
659     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
660         new JavaTestKit(getSystem()) {
661             {
662                 String persistenceId = factory.generateActorId("leader-");
663
664                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
665
666                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
667
668                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
669
670                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
671                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
672
673                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
674
675                 mockRaftActor.waitForInitializeBehaviorComplete();
676
677                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
678
679                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
680
681                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
682             }
683         };
684     }
685
686     @Test
687     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
688         new JavaTestKit(getSystem()) {
689             {
690                 String persistenceId = factory.generateActorId("leader-");
691
692                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
693
694                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
695
696                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
697
698                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
699                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
700
701                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
702
703                 mockRaftActor.waitForInitializeBehaviorComplete();
704
705                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
706
707                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
708
709             }
710
711         };
712     }
713
714     @Test
715     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
716         new JavaTestKit(getSystem()) {
717             {
718                 String persistenceId = factory.generateActorId("leader-");
719
720                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
721
722                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
723
724                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
725
726                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
727                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
728                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
729
730                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
731
732                 mockRaftActor.waitForInitializeBehaviorComplete();
733
734                 ByteString snapshotBytes = fromObject(Arrays.asList(
735                         new MockRaftActorContext.MockPayload("A"),
736                         new MockRaftActorContext.MockPayload("B"),
737                         new MockRaftActorContext.MockPayload("C"),
738                         new MockRaftActorContext.MockPayload("D")));
739
740                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
741
742                 raftActorContext.getSnapshotManager().capture(
743                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
744                                 new MockRaftActorContext.MockPayload("D")), -1);
745
746                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
747
748                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
749
750                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
751
752             }
753         };
754     }
755
756     @Test
757     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
758         new JavaTestKit(getSystem()) {
759             {
760                 String persistenceId = factory.generateActorId("leader-");
761
762                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
763
764                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
765
766                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
767
768                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
769                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
770
771                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
772
773                 mockRaftActor.waitForInitializeBehaviorComplete();
774                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
775
776                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
777                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
778                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
779                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
780                 mockRaftActor.getReplicatedLog().append(lastEntry);
781
782                 ByteString snapshotBytes = fromObject(Arrays.asList(
783                         new MockRaftActorContext.MockPayload("A"),
784                         new MockRaftActorContext.MockPayload("B"),
785                         new MockRaftActorContext.MockPayload("C"),
786                         new MockRaftActorContext.MockPayload("D")));
787
788                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
789                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
790
791                 long replicatedToAllIndex = 1;
792
793                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
794
795                 verify(mockRaftActor.delegate).createSnapshot();
796
797                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
798
799                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
800
801                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
802
803                 verify(dataPersistenceProvider).deleteMessages(100);
804
805                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
806                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
807
808                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
809                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
810                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
811
812                 // Index 2 will not be in the log because it was removed due to snapshotting
813                 assertNull(mockRaftActor.getReplicatedLog().get(1));
814                 assertNull(mockRaftActor.getReplicatedLog().get(0));
815
816             }
817         };
818     }
819
820     @Test
821     public void testApplyState() throws Exception {
822
823         new JavaTestKit(getSystem()) {
824             {
825                 String persistenceId = factory.generateActorId("leader-");
826
827                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
828
829                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
830
831                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
832
833                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
834                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
835
836                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
837
838                 mockRaftActor.waitForInitializeBehaviorComplete();
839
840                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
841                         new MockRaftActorContext.MockPayload("F"));
842
843                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
844
845                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
846
847             }
848         };
849     }
850
851     @Test
852     public void testApplySnapshot() throws Exception {
853         new JavaTestKit(getSystem()) {
854             {
855                 String persistenceId = factory.generateActorId("leader-");
856
857                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
858
859                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
860
861                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
862
863                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
864                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
865
866                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
867
868                 mockRaftActor.waitForInitializeBehaviorComplete();
869
870                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
871
872                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
873                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
874                 oldReplicatedLog.append(
875                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
876                                 mock(Payload.class)));
877
878                 ByteString snapshotBytes = fromObject(Arrays.asList(
879                         new MockRaftActorContext.MockPayload("A"),
880                         new MockRaftActorContext.MockPayload("B"),
881                         new MockRaftActorContext.MockPayload("C"),
882                         new MockRaftActorContext.MockPayload("D")));
883
884                 Snapshot snapshot = mock(Snapshot.class);
885
886                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
887
888                 doReturn(3L).when(snapshot).getLastAppliedIndex();
889
890                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
891
892                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
893
894                 assertTrue("The replicatedLog should have changed",
895                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
896
897                 assertEquals("lastApplied should be same as in the snapshot",
898                         (Long) 3L, mockRaftActor.getLastApplied());
899
900                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
901
902             }
903         };
904     }
905
906     @Test
907     public void testSaveSnapshotFailure() throws Exception {
908         new JavaTestKit(getSystem()) {
909             {
910                 String persistenceId = factory.generateActorId("leader-");
911
912                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
913
914                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
915
916                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
917
918                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
919                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
920
921                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
922
923                 mockRaftActor.waitForInitializeBehaviorComplete();
924
925                 ByteString snapshotBytes = fromObject(Arrays.asList(
926                         new MockRaftActorContext.MockPayload("A"),
927                         new MockRaftActorContext.MockPayload("B"),
928                         new MockRaftActorContext.MockPayload("C"),
929                         new MockRaftActorContext.MockPayload("D")));
930
931                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
932
933                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
934
935                 raftActorContext.getSnapshotManager().capture(
936                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
937                                 new MockRaftActorContext.MockPayload("D")), 1);
938
939                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
940
941                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
942                         new Exception()));
943
944                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
945                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
946
947             }
948         };
949     }
950
951     @Test
952     public void testRaftRoleChangeNotifier() throws Exception {
953         new JavaTestKit(getSystem()) {{
954             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
955                     Props.create(MessageCollectorActor.class));
956             MessageCollectorActor.waitUntilReady(notifierActor);
957
958             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
959             long heartBeatInterval = 100;
960             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
961             config.setElectionTimeoutFactor(1);
962
963             String persistenceId = factory.generateActorId("notifier-");
964
965             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
966                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
967
968             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
969
970             // check if the notifier got a role change from null to Follower
971             RoleChanged raftRoleChanged = matches.get(0);
972             assertEquals(persistenceId, raftRoleChanged.getMemberId());
973             assertNull(raftRoleChanged.getOldRole());
974             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
975
976             // check if the notifier got a role change from Follower to Candidate
977             raftRoleChanged = matches.get(1);
978             assertEquals(persistenceId, raftRoleChanged.getMemberId());
979             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
980             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
981
982             // check if the notifier got a role change from Candidate to Leader
983             raftRoleChanged = matches.get(2);
984             assertEquals(persistenceId, raftRoleChanged.getMemberId());
985             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
986             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
987
988             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
989                     notifierActor, LeaderStateChanged.class);
990
991             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
992
993             notifierActor.underlyingActor().clear();
994
995             MockRaftActor raftActor = raftActorRef.underlyingActor();
996             final String newLeaderId = "new-leader";
997             Follower follower = new Follower(raftActor.getRaftActorContext()) {
998                 @Override
999                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1000                     leaderId = newLeaderId;
1001                     return this;
1002                 }
1003             };
1004
1005             raftActor.changeCurrentBehavior(follower);
1006
1007             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1008             assertEquals(persistenceId, leaderStateChange.getMemberId());
1009             assertEquals(null, leaderStateChange.getLeaderId());
1010
1011             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1012             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1013             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1014
1015             notifierActor.underlyingActor().clear();
1016
1017             raftActor.handleCommand("any");
1018
1019             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1020             assertEquals(persistenceId, leaderStateChange.getMemberId());
1021             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1022         }};
1023     }
1024
1025     @Test
1026     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1027         new JavaTestKit(getSystem()) {
1028             {
1029                 String persistenceId = factory.generateActorId("leader-");
1030                 String follower1Id = factory.generateActorId("follower-");
1031
1032                 ActorRef followerActor1 =
1033                         factory.createActor(Props.create(MessageCollectorActor.class));
1034
1035                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1036                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1037                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1038
1039                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1040
1041                 Map<String, String> peerAddresses = new HashMap<>();
1042                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1043
1044                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1045                         MockRaftActor.props(persistenceId, peerAddresses,
1046                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1047
1048                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1049
1050                 leaderActor.getRaftActorContext().setCommitIndex(4);
1051                 leaderActor.getRaftActorContext().setLastApplied(4);
1052                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1053
1054                 leaderActor.waitForInitializeBehaviorComplete();
1055
1056                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1057
1058                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1059                 leaderActor.setCurrentBehavior(leader);
1060                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1061
1062                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1063                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1064
1065                 assertEquals(8, leaderActor.getReplicatedLog().size());
1066
1067                 leaderActor.getRaftActorContext().getSnapshotManager()
1068                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1069                                 new MockRaftActorContext.MockPayload("x")), 4);
1070
1071                 verify(leaderActor.delegate).createSnapshot();
1072
1073                 assertEquals(8, leaderActor.getReplicatedLog().size());
1074
1075                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1076                 //fake snapshot on index 5
1077                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1078
1079                 assertEquals(8, leaderActor.getReplicatedLog().size());
1080
1081                 //fake snapshot on index 6
1082                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1083                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1084                 assertEquals(8, leaderActor.getReplicatedLog().size());
1085
1086                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1087
1088                 assertEquals(8, leaderActor.getReplicatedLog().size());
1089
1090                 ByteString snapshotBytes = fromObject(Arrays.asList(
1091                         new MockRaftActorContext.MockPayload("foo-0"),
1092                         new MockRaftActorContext.MockPayload("foo-1"),
1093                         new MockRaftActorContext.MockPayload("foo-2"),
1094                         new MockRaftActorContext.MockPayload("foo-3"),
1095                         new MockRaftActorContext.MockPayload("foo-4")));
1096
1097                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
1098                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1099
1100                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1101
1102                 // The commit is needed to complete the snapshot creation process
1103                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1104
1105                 // capture snapshot reply should remove the snapshotted entries only
1106                 assertEquals(3, leaderActor.getReplicatedLog().size());
1107                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1108
1109                 // add another non-replicated entry
1110                 leaderActor.getReplicatedLog().append(
1111                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1112
1113                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1114                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1115                 assertEquals(2, leaderActor.getReplicatedLog().size());
1116                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1117
1118             }
1119         };
1120     }
1121
1122     @Test
1123     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1124         new JavaTestKit(getSystem()) {
1125             {
1126                 String persistenceId = factory.generateActorId("follower-");
1127                 String leaderId = factory.generateActorId("leader-");
1128
1129
1130                 ActorRef leaderActor1 =
1131                         factory.createActor(Props.create(MessageCollectorActor.class));
1132
1133                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1134                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1135                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1136
1137                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1138
1139                 Map<String, String> peerAddresses = new HashMap<>();
1140                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1141
1142                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1143                         MockRaftActor.props(persistenceId, peerAddresses,
1144                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1145
1146                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1147                 followerActor.getRaftActorContext().setCommitIndex(4);
1148                 followerActor.getRaftActorContext().setLastApplied(4);
1149                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1150
1151                 followerActor.waitForInitializeBehaviorComplete();
1152
1153
1154                 Follower follower = new Follower(followerActor.getRaftActorContext());
1155                 followerActor.setCurrentBehavior(follower);
1156                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1157
1158                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1159                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1160                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1161
1162                 // log has indices 0-5
1163                 assertEquals(6, followerActor.getReplicatedLog().size());
1164
1165                 //snapshot on 4
1166                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1167                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1168                                 new MockRaftActorContext.MockPayload("D")), 4);
1169
1170                 verify(followerActor.delegate).createSnapshot();
1171
1172                 assertEquals(6, followerActor.getReplicatedLog().size());
1173
1174                 //fake snapshot on index 6
1175                 List<ReplicatedLogEntry> entries =
1176                         Arrays.asList(
1177                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1178                                         new MockRaftActorContext.MockPayload("foo-6"))
1179                         );
1180                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1181                 assertEquals(7, followerActor.getReplicatedLog().size());
1182
1183                 //fake snapshot on index 7
1184                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1185
1186                 entries =
1187                         Arrays.asList(
1188                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1189                                         new MockRaftActorContext.MockPayload("foo-7"))
1190                         );
1191                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1192                 assertEquals(8, followerActor.getReplicatedLog().size());
1193
1194                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1195
1196
1197                 ByteString snapshotBytes = fromObject(Arrays.asList(
1198                         new MockRaftActorContext.MockPayload("foo-0"),
1199                         new MockRaftActorContext.MockPayload("foo-1"),
1200                         new MockRaftActorContext.MockPayload("foo-2"),
1201                         new MockRaftActorContext.MockPayload("foo-3"),
1202                         new MockRaftActorContext.MockPayload("foo-4")));
1203                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1204                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1205
1206                 // The commit is needed to complete the snapshot creation process
1207                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1208
1209                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1210                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1211                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1212
1213                 entries =
1214                         Arrays.asList(
1215                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1216                                         new MockRaftActorContext.MockPayload("foo-7"))
1217                         );
1218                 // send an additional entry 8 with leaderCommit = 7
1219                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1220
1221                 // 7 and 8, as lastapplied is 7
1222                 assertEquals(2, followerActor.getReplicatedLog().size());
1223
1224             }
1225         };
1226     }
1227
1228     @Test
1229     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1230         new JavaTestKit(getSystem()) {
1231             {
1232                 String persistenceId = factory.generateActorId("leader-");
1233                 String follower1Id = factory.generateActorId("follower-");
1234                 String follower2Id = factory.generateActorId("follower-");
1235
1236                 ActorRef followerActor1 =
1237                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1238                 ActorRef followerActor2 =
1239                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1240
1241                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1242                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1243                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1244
1245                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1246
1247                 Map<String, String> peerAddresses = new HashMap<>();
1248                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1249                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1250
1251                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1252                         MockRaftActor.props(persistenceId, peerAddresses,
1253                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1254
1255                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1256                 leaderActor.getRaftActorContext().setCommitIndex(9);
1257                 leaderActor.getRaftActorContext().setLastApplied(9);
1258                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1259
1260                 leaderActor.waitForInitializeBehaviorComplete();
1261
1262                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1263                 leaderActor.setCurrentBehavior(leader);
1264                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1265
1266                 // create 5 entries in the log
1267                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1268                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1269
1270                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1271                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1272                 //setting replicatedToAllIndex = 9, for the log to clear
1273                 leader.setReplicatedToAllIndex(9);
1274                 assertEquals(5, leaderActor.getReplicatedLog().size());
1275                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1276
1277                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1278                 assertEquals(5, leaderActor.getReplicatedLog().size());
1279                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1280
1281                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1282                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1283                 assertEquals(5, leaderActor.getReplicatedLog().size());
1284                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1285
1286                 // simulate a real snapshot
1287                 leaderActor.onReceiveCommand(new SendHeartBeat());
1288                 assertEquals(5, leaderActor.getReplicatedLog().size());
1289                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1290                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1291                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1292
1293
1294                 //reply from a slow follower does not initiate a fake snapshot
1295                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1296                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1297
1298                 ByteString snapshotBytes = fromObject(Arrays.asList(
1299                         new MockRaftActorContext.MockPayload("foo-0"),
1300                         new MockRaftActorContext.MockPayload("foo-1"),
1301                         new MockRaftActorContext.MockPayload("foo-2"),
1302                         new MockRaftActorContext.MockPayload("foo-3"),
1303                         new MockRaftActorContext.MockPayload("foo-4")));
1304                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1305                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1306
1307                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1308
1309                 //reply from a slow follower after should not raise errors
1310                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1311                 assertEquals(0, leaderActor.getReplicatedLog().size());
1312             }
1313         };
1314     }
1315
1316
1317     private static class NonPersistentProvider implements DataPersistenceProvider {
1318         @Override
1319         public boolean isRecoveryApplicable() {
1320             return false;
1321         }
1322
1323         @Override
1324         public <T> void persist(T o, Procedure<T> procedure) {
1325             try {
1326                 procedure.apply(o);
1327             } catch (Exception e) {
1328                 e.printStackTrace();
1329             }
1330         }
1331
1332         @Override
1333         public void saveSnapshot(Object o) {
1334
1335         }
1336
1337         @Override
1338         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1339
1340         }
1341
1342         @Override
1343         public void deleteMessages(long sequenceNumber) {
1344
1345         }
1346     }
1347
1348     @Test
1349     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1350         new JavaTestKit(getSystem()) {{
1351             String persistenceId = factory.generateActorId("leader-");
1352             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1353             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1354             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1355             config.setSnapshotBatchCount(5);
1356
1357             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1358
1359             Map<String, String> peerAddresses = new HashMap<>();
1360
1361             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1362                     MockRaftActor.props(persistenceId, peerAddresses,
1363                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1364
1365             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1366             leaderActor.getRaftActorContext().setCommitIndex(3);
1367             leaderActor.getRaftActorContext().setLastApplied(3);
1368             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1369
1370             leaderActor.waitForInitializeBehaviorComplete();
1371             for(int i=0;i< 4;i++) {
1372                 leaderActor.getReplicatedLog()
1373                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1374                                 new MockRaftActorContext.MockPayload("A")));
1375             }
1376
1377             Leader leader = new Leader(leaderActor.getRaftActorContext());
1378             leaderActor.setCurrentBehavior(leader);
1379             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1380
1381             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1382             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1383
1384             // Now send a CaptureSnapshotReply
1385             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1386
1387             // Trimming log in this scenario is a no-op
1388             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1389             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1390             assertEquals(-1, leader.getReplicatedToAllIndex());
1391
1392         }};
1393     }
1394
1395     @Test
1396     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1397         new JavaTestKit(getSystem()) {{
1398             String persistenceId = factory.generateActorId("leader-");
1399             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1400             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1401             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1402             config.setSnapshotBatchCount(5);
1403
1404             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1405
1406             Map<String, String> peerAddresses = new HashMap<>();
1407
1408             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1409                     MockRaftActor.props(persistenceId, peerAddresses,
1410                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1411
1412             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1413             leaderActor.getRaftActorContext().setCommitIndex(3);
1414             leaderActor.getRaftActorContext().setLastApplied(3);
1415             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1416             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1417
1418             leaderActor.waitForInitializeBehaviorComplete();
1419             Leader leader = new Leader(leaderActor.getRaftActorContext());
1420             leaderActor.setCurrentBehavior(leader);
1421             leader.setReplicatedToAllIndex(3);
1422             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1423
1424             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1425             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1426
1427             // Now send a CaptureSnapshotReply
1428             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1429
1430             // Trimming log in this scenario is a no-op
1431             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1432             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1433             assertEquals(3, leader.getReplicatedToAllIndex());
1434
1435         }};
1436     }
1437
1438     private ByteString fromObject(Object snapshot) throws Exception {
1439         ByteArrayOutputStream b = null;
1440         ObjectOutputStream o = null;
1441         try {
1442             b = new ByteArrayOutputStream();
1443             o = new ObjectOutputStream(b);
1444             o.writeObject(snapshot);
1445             byte[] snapshotBytes = b.toByteArray();
1446             return ByteString.copyFrom(snapshotBytes);
1447         } finally {
1448             if (o != null) {
1449                 o.flush();
1450                 o.close();
1451             }
1452             if (b != null) {
1453                 b.close();
1454             }
1455         }
1456     }
1457
1458 }