Merge "Bug-2842:If an install snapshot is in progress, Follower should return immedia...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
58 import org.opendaylight.controller.cluster.notifications.RoleChanged;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
66 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
67 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
68 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
69 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
71 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.Duration;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class RaftActorTest extends AbstractActorTest {
83
84     private TestActorFactory factory;
85
86     @Before
87     public void setUp(){
88         factory = new TestActorFactory(getSystem());
89     }
90
91     @After
92     public void tearDown() throws Exception {
93         factory.close();
94         InMemoryJournal.clear();
95         InMemorySnapshotStore.clear();
96     }
97
98     public static class MockRaftActor extends RaftActor {
99
100         protected DataPersistenceProvider dataPersistenceProvider;
101         private final RaftActor delegate;
102         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103         private final List<Object> state;
104         private ActorRef roleChangeNotifier;
105         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
106
107         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108             private static final long serialVersionUID = 1L;
109             private final Map<String, String> peerAddresses;
110             private final String id;
111             private final Optional<ConfigParams> config;
112             private final DataPersistenceProvider dataPersistenceProvider;
113             private final ActorRef roleChangeNotifier;
114
115             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117                 ActorRef roleChangeNotifier) {
118                 this.peerAddresses = peerAddresses;
119                 this.id = id;
120                 this.config = config;
121                 this.dataPersistenceProvider = dataPersistenceProvider;
122                 this.roleChangeNotifier = roleChangeNotifier;
123             }
124
125             @Override
126             public MockRaftActor create() throws Exception {
127                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128                     dataPersistenceProvider);
129                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130                 return mockRaftActor;
131             }
132         }
133
134         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135                              DataPersistenceProvider dataPersistenceProvider) {
136             super(id, peerAddresses, config);
137             state = new ArrayList<>();
138             this.delegate = mock(RaftActor.class);
139             if(dataPersistenceProvider == null){
140                 this.dataPersistenceProvider = new PersistentDataProvider();
141             } else {
142                 this.dataPersistenceProvider = dataPersistenceProvider;
143             }
144         }
145
146         public void waitForRecoveryComplete() {
147             try {
148                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
149             } catch (InterruptedException e) {
150                 e.printStackTrace();
151             }
152         }
153
154         public void waitForInitializeBehaviorComplete() {
155             try {
156                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
157             } catch (InterruptedException e) {
158                 e.printStackTrace();
159             }
160         }
161
162         public List<Object> getState() {
163             return state;
164         }
165
166         public static Props props(final String id, final Map<String, String> peerAddresses,
167                 Optional<ConfigParams> config){
168             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
169         }
170
171         public static Props props(final String id, final Map<String, String> peerAddresses,
172                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
173             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
174         }
175
176         public static Props props(final String id, final Map<String, String> peerAddresses,
177             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
178             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
179         }
180
181         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
182             delegate.applyState(clientActor, identifier, data);
183             LOG.info("{}: applyState called", persistenceId());
184         }
185
186         @Override
187         protected void startLogRecoveryBatch(int maxBatchSize) {
188         }
189
190         @Override
191         protected void appendRecoveredLogEntry(Payload data) {
192             state.add(data);
193         }
194
195         @Override
196         protected void applyCurrentLogRecoveryBatch() {
197         }
198
199         @Override
200         protected void onRecoveryComplete() {
201             delegate.onRecoveryComplete();
202             recoveryComplete.countDown();
203         }
204
205         @Override
206         protected void initializeBehavior() {
207             super.initializeBehavior();
208             initializeBehaviorComplete.countDown();
209         }
210
211         @Override
212         protected void applyRecoverySnapshot(byte[] bytes) {
213             delegate.applyRecoverySnapshot(bytes);
214             try {
215                 Object data = toObject(bytes);
216                 if (data instanceof List) {
217                     state.addAll((List<?>) data);
218                 }
219             } catch (Exception e) {
220                 e.printStackTrace();
221             }
222         }
223
224         @Override protected void createSnapshot() {
225             LOG.info("{}: createSnapshot called", persistenceId());
226             delegate.createSnapshot();
227         }
228
229         @Override protected void applySnapshot(byte [] snapshot) {
230             LOG.info("{}: applySnapshot called", persistenceId());
231             delegate.applySnapshot(snapshot);
232         }
233
234         @Override protected void onStateChanged() {
235             delegate.onStateChanged();
236         }
237
238         @Override
239         protected DataPersistenceProvider persistence() {
240             return this.dataPersistenceProvider;
241         }
242
243         @Override
244         protected Optional<ActorRef> getRoleChangeNotifier() {
245             return Optional.fromNullable(roleChangeNotifier);
246         }
247
248         @Override public String persistenceId() {
249             return this.getId();
250         }
251
252         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
253             Object obj = null;
254             ByteArrayInputStream bis = null;
255             ObjectInputStream ois = null;
256             try {
257                 bis = new ByteArrayInputStream(bs);
258                 ois = new ObjectInputStream(bis);
259                 obj = ois.readObject();
260             } finally {
261                 if (bis != null) {
262                     bis.close();
263                 }
264                 if (ois != null) {
265                     ois.close();
266                 }
267             }
268             return obj;
269         }
270
271         public ReplicatedLog getReplicatedLog(){
272             return this.getRaftActorContext().getReplicatedLog();
273         }
274
275     }
276
277
278     public static class RaftActorTestKit extends JavaTestKit {
279         private final ActorRef raftActor;
280
281         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
282             super(actorSystem);
283
284             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
285                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
286
287         }
288
289
290         public ActorRef getRaftActor() {
291             return raftActor;
292         }
293
294         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
295             // Wait for a specific log message to show up
296             return
297                 new JavaTestKit.EventFilter<Boolean>(logEventClass
298                 ) {
299                     @Override
300                     protected Boolean run() {
301                         return true;
302                     }
303                 }.from(raftActor.path().toString())
304                     .message(message)
305                     .occurrences(1).exec();
306
307
308         }
309
310         protected void waitUntilLeader(){
311             waitUntilLeader(raftActor);
312         }
313
314         public static void waitUntilLeader(ActorRef actorRef) {
315             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
316             for(int i = 0; i < 20 * 5; i++) {
317                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
318                 try {
319                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
320                     if(resp.getLeaderActor() != null) {
321                         return;
322                     }
323                 } catch(TimeoutException e) {
324                 } catch(Exception e) {
325                     System.err.println("FindLeader threw ex");
326                     e.printStackTrace();
327                 }
328
329
330                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
331             }
332
333             Assert.fail("Leader not found for actorRef " + actorRef.path());
334         }
335
336     }
337
338
339     @Test
340     public void testConstruction() {
341         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
342     }
343
344     @Test
345     public void testFindLeaderWhenLeaderIsSelf(){
346         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
347         kit.waitUntilLeader();
348     }
349
350     @Test
351     public void testRaftActorRecovery() throws Exception {
352         new JavaTestKit(getSystem()) {{
353             String persistenceId = factory.generateActorId("follower-");
354
355             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
356             // Set the heartbeat interval high to essentially disable election otherwise the test
357             // may fail if the actor is switched to Leader and the commitIndex is set to the last
358             // log entry.
359             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
360
361             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
362                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
363
364             watch(followerActor);
365
366             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
367             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
368                     new MockRaftActorContext.MockPayload("E"));
369             snapshotUnappliedEntries.add(entry1);
370
371             int lastAppliedDuringSnapshotCapture = 3;
372             int lastIndexDuringSnapshotCapture = 4;
373
374             // 4 messages as part of snapshot, which are applied to state
375             ByteString snapshotBytes = fromObject(Arrays.asList(
376                     new MockRaftActorContext.MockPayload("A"),
377                     new MockRaftActorContext.MockPayload("B"),
378                     new MockRaftActorContext.MockPayload("C"),
379                     new MockRaftActorContext.MockPayload("D")));
380
381             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
382                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
383                     lastAppliedDuringSnapshotCapture, 1);
384             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
385
386             // add more entries after snapshot is taken
387             List<ReplicatedLogEntry> entries = new ArrayList<>();
388             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
389                     new MockRaftActorContext.MockPayload("F"));
390             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
391                     new MockRaftActorContext.MockPayload("G"));
392             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
393                     new MockRaftActorContext.MockPayload("H"));
394             entries.add(entry2);
395             entries.add(entry3);
396             entries.add(entry4);
397
398             int lastAppliedToState = 5;
399             int lastIndex = 7;
400
401             InMemoryJournal.addEntry(persistenceId, 5, entry2);
402             // 2 entries are applied to state besides the 4 entries in snapshot
403             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
404             InMemoryJournal.addEntry(persistenceId, 7, entry3);
405             InMemoryJournal.addEntry(persistenceId, 8, entry4);
406
407             // kill the actor
408             followerActor.tell(PoisonPill.getInstance(), null);
409             expectMsgClass(duration("5 seconds"), Terminated.class);
410
411             unwatch(followerActor);
412
413             //reinstate the actor
414             TestActorRef<MockRaftActor> ref = factory.createTestActor(
415                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
416                             Optional.<ConfigParams>of(config)));
417
418             ref.underlyingActor().waitForRecoveryComplete();
419
420             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
421             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
422                     context.getReplicatedLog().size());
423             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
424             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
425             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
426             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
427         }};
428     }
429
430     @Test
431     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
432         new JavaTestKit(getSystem()) {{
433             String persistenceId = factory.generateActorId("leader-");
434
435             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
436             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
437
438             // Setup the persisted journal with some entries
439             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
440                     new MockRaftActorContext.MockPayload("zero"));
441             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
442                     new MockRaftActorContext.MockPayload("oen"));
443             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
444                     new MockRaftActorContext.MockPayload("two"));
445
446             long seqNr = 1;
447             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
448             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
449             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
450             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
451
452             int lastAppliedToState = 1;
453             int lastIndex = 2;
454
455             //reinstate the actor
456             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
457                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
458                             Optional.<ConfigParams>of(config)));
459
460             leaderActor.underlyingActor().waitForRecoveryComplete();
461
462             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
463             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
464             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
465             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
466             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
467         }};
468     }
469
470     /**
471      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
472      * process recovery messages
473      *
474      * @throws Exception
475      */
476
477     @Test
478     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
479         new JavaTestKit(getSystem()) {
480             {
481                 String persistenceId = factory.generateActorId("leader-");
482
483                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
484
485                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
486
487                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
488                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
489
490                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
491
492                 // Wait for akka's recovery to complete so it doesn't interfere.
493                 mockRaftActor.waitForRecoveryComplete();
494
495                 ByteString snapshotBytes = fromObject(Arrays.asList(
496                         new MockRaftActorContext.MockPayload("A"),
497                         new MockRaftActorContext.MockPayload("B"),
498                         new MockRaftActorContext.MockPayload("C"),
499                         new MockRaftActorContext.MockPayload("D")));
500
501                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
502                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
503
504                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
505
506                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
507
508                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
509
510                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
511
512                 assertEquals("add replicated log entry", 1, replicatedLog.size());
513
514                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
515
516                 assertEquals("add replicated log entry", 2, replicatedLog.size());
517
518                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
519
520                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
521
522                 // The snapshot had 4 items + we added 2 more items during the test
523                 // We start removing from 5 and we should get 1 item in the replicated log
524                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
525
526                 assertEquals("remove log entries", 1, replicatedLog.size());
527
528                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
529
530                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
531                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
532
533                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
534
535             }};
536     }
537
538     /**
539      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
540      * not process recovery messages
541      *
542      * @throws Exception
543      */
544     @Test
545     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
546         new JavaTestKit(getSystem()) {
547             {
548                 String persistenceId = factory.generateActorId("leader-");
549
550                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
551
552                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
553
554                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
555                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
556
557                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
558
559                 // Wait for akka's recovery to complete so it doesn't interfere.
560                 mockRaftActor.waitForRecoveryComplete();
561
562                 ByteString snapshotBytes = fromObject(Arrays.asList(
563                         new MockRaftActorContext.MockPayload("A"),
564                         new MockRaftActorContext.MockPayload("B"),
565                         new MockRaftActorContext.MockPayload("C"),
566                         new MockRaftActorContext.MockPayload("D")));
567
568                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
569                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
570
571                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
572
573                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
574
575                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
576
577                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
578
579                 assertEquals("add replicated log entry", 0, replicatedLog.size());
580
581                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
582
583                 assertEquals("add replicated log entry", 0, replicatedLog.size());
584
585                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
586
587                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
588
589                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
590
591                 assertEquals("remove log entries", 0, replicatedLog.size());
592
593                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
594
595                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
596                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
597
598                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
599             }};
600     }
601
602
603     @Test
604     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
605         new JavaTestKit(getSystem()) {
606             {
607                 String persistenceId = factory.generateActorId("leader-");
608
609                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
610
611                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
612
613                 CountDownLatch persistLatch = new CountDownLatch(1);
614                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
615                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
616
617                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
618                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
619
620                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
621
622                 mockRaftActor.waitForInitializeBehaviorComplete();
623
624                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
625
626                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
627             }
628         };
629     }
630
631     @Test
632     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
633         new JavaTestKit(getSystem()) {
634             {
635                 String persistenceId = factory.generateActorId("leader-");
636
637                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
638
639                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
640
641                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
642
643                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
644                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
645
646                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
647
648                 mockRaftActor.waitForInitializeBehaviorComplete();
649
650                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
651
652                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
653
654                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
655             }
656         };
657     }
658
659     @Test
660     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
661         new JavaTestKit(getSystem()) {
662             {
663                 String persistenceId = factory.generateActorId("leader-");
664
665                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
666
667                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
668
669                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
670
671                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
672                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
673
674                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
675
676                 mockRaftActor.waitForInitializeBehaviorComplete();
677
678                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
679
680                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
681
682                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
683             }
684         };
685     }
686
687     @Test
688     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
689         new JavaTestKit(getSystem()) {
690             {
691                 String persistenceId = factory.generateActorId("leader-");
692
693                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
694
695                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
696
697                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
698
699                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
700                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
701
702                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
703
704                 mockRaftActor.waitForInitializeBehaviorComplete();
705
706                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
707
708                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
709
710             }
711
712         };
713     }
714
715     @Test
716     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
717         new JavaTestKit(getSystem()) {
718             {
719                 String persistenceId = factory.generateActorId("leader-");
720
721                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
722
723                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
724
725                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
726
727                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
728                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
729                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
730
731                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
732
733                 mockRaftActor.waitForInitializeBehaviorComplete();
734
735                 ByteString snapshotBytes = fromObject(Arrays.asList(
736                         new MockRaftActorContext.MockPayload("A"),
737                         new MockRaftActorContext.MockPayload("B"),
738                         new MockRaftActorContext.MockPayload("C"),
739                         new MockRaftActorContext.MockPayload("D")));
740
741                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
742
743                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
744
745                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
746
747                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
748
749                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
750
751             }
752         };
753     }
754
755     @Test
756     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
757         new JavaTestKit(getSystem()) {
758             {
759                 String persistenceId = factory.generateActorId("leader-");
760
761                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
762
763                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
764
765                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
766
767                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
768                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
769
770                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
771
772                 mockRaftActor.waitForInitializeBehaviorComplete();
773
774                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
775                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
776                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
777                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
778                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
779
780                 ByteString snapshotBytes = fromObject(Arrays.asList(
781                         new MockRaftActorContext.MockPayload("A"),
782                         new MockRaftActorContext.MockPayload("B"),
783                         new MockRaftActorContext.MockPayload("C"),
784                         new MockRaftActorContext.MockPayload("D")));
785
786                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
787                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
788
789                 long replicatedToAllIndex = 1;
790                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
791
792                 verify(mockRaftActor.delegate).createSnapshot();
793
794                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
795
796                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
797
798                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
799
800                 verify(dataPersistenceProvider).deleteMessages(100);
801
802                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
803                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
804
805                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
806                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
807                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
808
809                 // Index 2 will not be in the log because it was removed due to snapshotting
810                 assertNull(mockRaftActor.getReplicatedLog().get(1));
811                 assertNull(mockRaftActor.getReplicatedLog().get(0));
812
813             }
814         };
815     }
816
817     @Test
818     public void testApplyState() throws Exception {
819
820         new JavaTestKit(getSystem()) {
821             {
822                 String persistenceId = factory.generateActorId("leader-");
823
824                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
825
826                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
827
828                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
829
830                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
831                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
832
833                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
834
835                 mockRaftActor.waitForInitializeBehaviorComplete();
836
837                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
838                         new MockRaftActorContext.MockPayload("F"));
839
840                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
841
842                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
843
844             }
845         };
846     }
847
848     @Test
849     public void testApplySnapshot() throws Exception {
850         new JavaTestKit(getSystem()) {
851             {
852                 String persistenceId = factory.generateActorId("leader-");
853
854                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
855
856                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
857
858                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
859
860                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
861                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
862
863                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
864
865                 mockRaftActor.waitForInitializeBehaviorComplete();
866
867                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
868
869                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
870                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
871                 oldReplicatedLog.append(
872                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
873                                 mock(Payload.class)));
874
875                 ByteString snapshotBytes = fromObject(Arrays.asList(
876                         new MockRaftActorContext.MockPayload("A"),
877                         new MockRaftActorContext.MockPayload("B"),
878                         new MockRaftActorContext.MockPayload("C"),
879                         new MockRaftActorContext.MockPayload("D")));
880
881                 Snapshot snapshot = mock(Snapshot.class);
882
883                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
884
885                 doReturn(3L).when(snapshot).getLastAppliedIndex();
886
887                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
888
889                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
890
891                 assertTrue("The replicatedLog should have changed",
892                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
893
894                 assertEquals("lastApplied should be same as in the snapshot",
895                         (Long) 3L, mockRaftActor.getLastApplied());
896
897                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
898
899             }
900         };
901     }
902
903     @Test
904     public void testSaveSnapshotFailure() throws Exception {
905         new JavaTestKit(getSystem()) {
906             {
907                 String persistenceId = factory.generateActorId("leader-");
908
909                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
910
911                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
912
913                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
914
915                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
916                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
917
918                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
919
920                 mockRaftActor.waitForInitializeBehaviorComplete();
921
922                 ByteString snapshotBytes = fromObject(Arrays.asList(
923                         new MockRaftActorContext.MockPayload("A"),
924                         new MockRaftActorContext.MockPayload("B"),
925                         new MockRaftActorContext.MockPayload("C"),
926                         new MockRaftActorContext.MockPayload("D")));
927
928                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
929
930                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
931
932                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
933
934                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
935
936                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
937                         new Exception()));
938
939                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
940                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
941
942             }
943         };
944     }
945
946     @Test
947     public void testRaftRoleChangeNotifier() throws Exception {
948         new JavaTestKit(getSystem()) {{
949             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
950                     Props.create(MessageCollectorActor.class));
951             MessageCollectorActor.waitUntilReady(notifierActor);
952
953             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
954             long heartBeatInterval = 100;
955             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
956             config.setElectionTimeoutFactor(1);
957
958             String persistenceId = factory.generateActorId("notifier-");
959
960             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
961                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
962
963             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
964
965             // check if the notifier got a role change from null to Follower
966             RoleChanged raftRoleChanged = matches.get(0);
967             assertEquals(persistenceId, raftRoleChanged.getMemberId());
968             assertNull(raftRoleChanged.getOldRole());
969             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
970
971             // check if the notifier got a role change from Follower to Candidate
972             raftRoleChanged = matches.get(1);
973             assertEquals(persistenceId, raftRoleChanged.getMemberId());
974             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
975             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
976
977             // check if the notifier got a role change from Candidate to Leader
978             raftRoleChanged = matches.get(2);
979             assertEquals(persistenceId, raftRoleChanged.getMemberId());
980             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
981             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
982
983             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
984                     notifierActor, LeaderStateChanged.class);
985
986             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
987
988             notifierActor.underlyingActor().clear();
989
990             MockRaftActor raftActor = raftActorRef.underlyingActor();
991             final String newLeaderId = "new-leader";
992             Follower follower = new Follower(raftActor.getRaftActorContext()) {
993                 @Override
994                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
995                     leaderId = newLeaderId;
996                     return this;
997                 }
998             };
999
1000             raftActor.changeCurrentBehavior(follower);
1001
1002             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1003             assertEquals(persistenceId, leaderStateChange.getMemberId());
1004             assertEquals(null, leaderStateChange.getLeaderId());
1005
1006             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1007             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1008             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1009
1010             notifierActor.underlyingActor().clear();
1011
1012             raftActor.handleCommand("any");
1013
1014             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1015             assertEquals(persistenceId, leaderStateChange.getMemberId());
1016             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1017         }};
1018     }
1019
1020     @Test
1021     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1022         new JavaTestKit(getSystem()) {
1023             {
1024                 String persistenceId = factory.generateActorId("leader-");
1025                 String follower1Id = factory.generateActorId("follower-");
1026
1027                 ActorRef followerActor1 =
1028                         factory.createActor(Props.create(MessageCollectorActor.class));
1029
1030                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1031                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1032                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1033
1034                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1035
1036                 Map<String, String> peerAddresses = new HashMap<>();
1037                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1038
1039                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1040                         MockRaftActor.props(persistenceId, peerAddresses,
1041                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1042
1043                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1044
1045                 leaderActor.getRaftActorContext().setCommitIndex(4);
1046                 leaderActor.getRaftActorContext().setLastApplied(4);
1047                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1048
1049                 leaderActor.waitForInitializeBehaviorComplete();
1050
1051                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1052
1053                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1054                 leaderActor.setCurrentBehavior(leader);
1055                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1056
1057                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1058                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1059
1060                 assertEquals(8, leaderActor.getReplicatedLog().size());
1061
1062                 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1063
1064                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1065                 verify(leaderActor.delegate).createSnapshot();
1066
1067                 assertEquals(8, leaderActor.getReplicatedLog().size());
1068
1069                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1070                 //fake snapshot on index 5
1071                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1072
1073                 assertEquals(8, leaderActor.getReplicatedLog().size());
1074
1075                 //fake snapshot on index 6
1076                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1077                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1078                 assertEquals(8, leaderActor.getReplicatedLog().size());
1079
1080                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1081
1082                 assertEquals(8, leaderActor.getReplicatedLog().size());
1083
1084                 ByteString snapshotBytes = fromObject(Arrays.asList(
1085                         new MockRaftActorContext.MockPayload("foo-0"),
1086                         new MockRaftActorContext.MockPayload("foo-1"),
1087                         new MockRaftActorContext.MockPayload("foo-2"),
1088                         new MockRaftActorContext.MockPayload("foo-3"),
1089                         new MockRaftActorContext.MockPayload("foo-4")));
1090                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1091                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1092
1093                 // capture snapshot reply should remove the snapshotted entries only
1094                 assertEquals(3, leaderActor.getReplicatedLog().size());
1095                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1096
1097                 // add another non-replicated entry
1098                 leaderActor.getReplicatedLog().append(
1099                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1100
1101                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1102                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1103                 assertEquals(2, leaderActor.getReplicatedLog().size());
1104                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1105
1106             }
1107         };
1108     }
1109
1110     @Test
1111     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1112         new JavaTestKit(getSystem()) {
1113             {
1114                 String persistenceId = factory.generateActorId("follower-");
1115                 String leaderId = factory.generateActorId("leader-");
1116
1117
1118                 ActorRef leaderActor1 =
1119                         factory.createActor(Props.create(MessageCollectorActor.class));
1120
1121                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1122                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1123                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1124
1125                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1126
1127                 Map<String, String> peerAddresses = new HashMap<>();
1128                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1129
1130                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1131                         MockRaftActor.props(persistenceId, peerAddresses,
1132                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1133
1134                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1135                 followerActor.getRaftActorContext().setCommitIndex(4);
1136                 followerActor.getRaftActorContext().setLastApplied(4);
1137                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1138
1139                 followerActor.waitForInitializeBehaviorComplete();
1140
1141
1142                 Follower follower = new Follower(followerActor.getRaftActorContext());
1143                 followerActor.setCurrentBehavior(follower);
1144                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1145
1146                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1147                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1148                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1149
1150                 // log has indices 0-5
1151                 assertEquals(6, followerActor.getReplicatedLog().size());
1152
1153                 //snapshot on 4
1154                 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1155
1156                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1157                 verify(followerActor.delegate).createSnapshot();
1158
1159                 assertEquals(6, followerActor.getReplicatedLog().size());
1160
1161                 //fake snapshot on index 6
1162                 List<ReplicatedLogEntry> entries =
1163                         Arrays.asList(
1164                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1165                                         new MockRaftActorContext.MockPayload("foo-6"))
1166                         );
1167                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1168                 assertEquals(7, followerActor.getReplicatedLog().size());
1169
1170                 //fake snapshot on index 7
1171                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1172
1173                 entries =
1174                         Arrays.asList(
1175                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1176                                         new MockRaftActorContext.MockPayload("foo-7"))
1177                         );
1178                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1179                 assertEquals(8, followerActor.getReplicatedLog().size());
1180
1181                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1182
1183
1184                 ByteString snapshotBytes = fromObject(Arrays.asList(
1185                         new MockRaftActorContext.MockPayload("foo-0"),
1186                         new MockRaftActorContext.MockPayload("foo-1"),
1187                         new MockRaftActorContext.MockPayload("foo-2"),
1188                         new MockRaftActorContext.MockPayload("foo-3"),
1189                         new MockRaftActorContext.MockPayload("foo-4")));
1190                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1191                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1192
1193                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1194                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1195                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1196
1197                 entries =
1198                         Arrays.asList(
1199                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1200                                         new MockRaftActorContext.MockPayload("foo-7"))
1201                         );
1202                 // send an additional entry 8 with leaderCommit = 7
1203                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1204
1205                 // 7 and 8, as lastapplied is 7
1206                 assertEquals(2, followerActor.getReplicatedLog().size());
1207
1208             }
1209         };
1210     }
1211
1212     @Test
1213     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1214         new JavaTestKit(getSystem()) {
1215             {
1216                 String persistenceId = factory.generateActorId("leader-");
1217                 String follower1Id = factory.generateActorId("follower-");
1218                 String follower2Id = factory.generateActorId("follower-");
1219
1220                 ActorRef followerActor1 =
1221                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1222                 ActorRef followerActor2 =
1223                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1224
1225                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1226                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1227                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1228
1229                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1230
1231                 Map<String, String> peerAddresses = new HashMap<>();
1232                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1233                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1234
1235                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1236                         MockRaftActor.props(persistenceId, peerAddresses,
1237                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1238
1239                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1240                 leaderActor.getRaftActorContext().setCommitIndex(9);
1241                 leaderActor.getRaftActorContext().setLastApplied(9);
1242                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1243
1244                 leaderActor.waitForInitializeBehaviorComplete();
1245
1246                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1247                 leaderActor.setCurrentBehavior(leader);
1248                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1249
1250                 // create 5 entries in the log
1251                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1252                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1253
1254                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1255                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1256                 //setting replicatedToAllIndex = 9, for the log to clear
1257                 leader.setReplicatedToAllIndex(9);
1258                 assertEquals(5, leaderActor.getReplicatedLog().size());
1259                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1260
1261                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1262                 assertEquals(5, leaderActor.getReplicatedLog().size());
1263                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1264
1265                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1266                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1267                 assertEquals(5, leaderActor.getReplicatedLog().size());
1268                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1269
1270                 // simulate a real snapshot
1271                 leaderActor.onReceiveCommand(new SendHeartBeat());
1272                 assertEquals(5, leaderActor.getReplicatedLog().size());
1273                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1274                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1275                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1276
1277
1278                 //reply from a slow follower does not initiate a fake snapshot
1279                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1280                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1281
1282                 ByteString snapshotBytes = fromObject(Arrays.asList(
1283                         new MockRaftActorContext.MockPayload("foo-0"),
1284                         new MockRaftActorContext.MockPayload("foo-1"),
1285                         new MockRaftActorContext.MockPayload("foo-2"),
1286                         new MockRaftActorContext.MockPayload("foo-3"),
1287                         new MockRaftActorContext.MockPayload("foo-4")));
1288                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1289                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1290
1291                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1292
1293                 //reply from a slow follower after should not raise errors
1294                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1295                 assertEquals(0, leaderActor.getReplicatedLog().size());
1296             }
1297         };
1298     }
1299
1300
1301     private static class NonPersistentProvider implements DataPersistenceProvider {
1302         @Override
1303         public boolean isRecoveryApplicable() {
1304             return false;
1305         }
1306
1307         @Override
1308         public <T> void persist(T o, Procedure<T> procedure) {
1309             try {
1310                 procedure.apply(o);
1311             } catch (Exception e) {
1312                 e.printStackTrace();
1313             }
1314         }
1315
1316         @Override
1317         public void saveSnapshot(Object o) {
1318
1319         }
1320
1321         @Override
1322         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1323
1324         }
1325
1326         @Override
1327         public void deleteMessages(long sequenceNumber) {
1328
1329         }
1330     }
1331
1332     @Test
1333     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1334         new JavaTestKit(getSystem()) {{
1335             String persistenceId = factory.generateActorId("leader-");
1336             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1337             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1338             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1339             config.setSnapshotBatchCount(5);
1340
1341             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1342
1343             Map<String, String> peerAddresses = new HashMap<>();
1344
1345             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1346                     MockRaftActor.props(persistenceId, peerAddresses,
1347                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1348
1349             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1350             leaderActor.getRaftActorContext().setCommitIndex(3);
1351             leaderActor.getRaftActorContext().setLastApplied(3);
1352             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1353
1354             leaderActor.waitForInitializeBehaviorComplete();
1355             for(int i=0;i< 4;i++) {
1356                 leaderActor.getReplicatedLog()
1357                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1358                                 new MockRaftActorContext.MockPayload("A")));
1359             }
1360
1361             Leader leader = new Leader(leaderActor.getRaftActorContext());
1362             leaderActor.setCurrentBehavior(leader);
1363             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1364
1365             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1366             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1367
1368             // Now send a CaptureSnapshotReply
1369             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1370
1371             // Trimming log in this scenario is a no-op
1372             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1373             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1374             assertEquals(-1, leader.getReplicatedToAllIndex());
1375
1376         }};
1377     }
1378
1379     @Test
1380     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1381         new JavaTestKit(getSystem()) {{
1382             String persistenceId = factory.generateActorId("leader-");
1383             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1384             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1386             config.setSnapshotBatchCount(5);
1387
1388             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1389
1390             Map<String, String> peerAddresses = new HashMap<>();
1391
1392             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1393                     MockRaftActor.props(persistenceId, peerAddresses,
1394                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1395
1396             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1397             leaderActor.getRaftActorContext().setCommitIndex(3);
1398             leaderActor.getRaftActorContext().setLastApplied(3);
1399             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1400             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1401
1402             leaderActor.waitForInitializeBehaviorComplete();
1403             Leader leader = new Leader(leaderActor.getRaftActorContext());
1404             leaderActor.setCurrentBehavior(leader);
1405             leader.setReplicatedToAllIndex(3);
1406             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1407
1408             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1409             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1410
1411             // Now send a CaptureSnapshotReply
1412             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1413
1414             // Trimming log in this scenario is a no-op
1415             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1416             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1417             assertEquals(3, leader.getReplicatedToAllIndex());
1418
1419         }};
1420     }
1421
1422     private ByteString fromObject(Object snapshot) throws Exception {
1423         ByteArrayOutputStream b = null;
1424         ObjectOutputStream o = null;
1425         try {
1426             b = new ByteArrayOutputStream();
1427             o = new ObjectOutputStream(b);
1428             o.writeObject(snapshot);
1429             byte[] snapshotBytes = b.toByteArray();
1430             return ByteString.copyFrom(snapshotBytes);
1431         } finally {
1432             if (o != null) {
1433                 o.flush();
1434                 o.close();
1435             }
1436             if (b != null) {
1437                 b.close();
1438             }
1439         }
1440     }
1441
1442 }