Add RaftActor integration tests
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.RoleChanged;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
62 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
68 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.Duration;
78 import scala.concurrent.duration.FiniteDuration;
79
80 public class RaftActorTest extends AbstractActorTest {
81
82     private TestActorFactory factory;
83
84     @Before
85     public void setUp(){
86         factory = new TestActorFactory(getSystem());
87     }
88
89     @After
90     public void tearDown() throws Exception {
91         factory.close();
92         InMemoryJournal.clear();
93         InMemorySnapshotStore.clear();
94     }
95
96     public static class MockRaftActor extends RaftActor {
97
98         protected DataPersistenceProvider dataPersistenceProvider;
99         private final RaftActor delegate;
100         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
101         private final List<Object> state;
102         private ActorRef roleChangeNotifier;
103         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
104
105         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
106             private static final long serialVersionUID = 1L;
107             private final Map<String, String> peerAddresses;
108             private final String id;
109             private final Optional<ConfigParams> config;
110             private final DataPersistenceProvider dataPersistenceProvider;
111             private final ActorRef roleChangeNotifier;
112
113             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
114                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
115                 ActorRef roleChangeNotifier) {
116                 this.peerAddresses = peerAddresses;
117                 this.id = id;
118                 this.config = config;
119                 this.dataPersistenceProvider = dataPersistenceProvider;
120                 this.roleChangeNotifier = roleChangeNotifier;
121             }
122
123             @Override
124             public MockRaftActor create() throws Exception {
125                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
126                     dataPersistenceProvider);
127                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
128                 return mockRaftActor;
129             }
130         }
131
132         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
133                              DataPersistenceProvider dataPersistenceProvider) {
134             super(id, peerAddresses, config);
135             state = new ArrayList<>();
136             this.delegate = mock(RaftActor.class);
137             if(dataPersistenceProvider == null){
138                 this.dataPersistenceProvider = new PersistentDataProvider();
139             } else {
140                 this.dataPersistenceProvider = dataPersistenceProvider;
141             }
142         }
143
144         public void waitForRecoveryComplete() {
145             try {
146                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
147             } catch (InterruptedException e) {
148                 e.printStackTrace();
149             }
150         }
151
152         public void waitForInitializeBehaviorComplete() {
153             try {
154                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
155             } catch (InterruptedException e) {
156                 e.printStackTrace();
157             }
158         }
159
160         public List<Object> getState() {
161             return state;
162         }
163
164         public static Props props(final String id, final Map<String, String> peerAddresses,
165                 Optional<ConfigParams> config){
166             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
167         }
168
169         public static Props props(final String id, final Map<String, String> peerAddresses,
170                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
171             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
172         }
173
174         public static Props props(final String id, final Map<String, String> peerAddresses,
175             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
176             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
177         }
178
179         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
180             delegate.applyState(clientActor, identifier, data);
181             LOG.info("{}: applyState called", persistenceId());
182         }
183
184         @Override
185         protected void startLogRecoveryBatch(int maxBatchSize) {
186         }
187
188         @Override
189         protected void appendRecoveredLogEntry(Payload data) {
190             state.add(data);
191         }
192
193         @Override
194         protected void applyCurrentLogRecoveryBatch() {
195         }
196
197         @Override
198         protected void onRecoveryComplete() {
199             delegate.onRecoveryComplete();
200             recoveryComplete.countDown();
201         }
202
203         @Override
204         protected void initializeBehavior() {
205             super.initializeBehavior();
206             initializeBehaviorComplete.countDown();
207         }
208
209         @Override
210         protected void applyRecoverySnapshot(byte[] bytes) {
211             delegate.applyRecoverySnapshot(bytes);
212             try {
213                 Object data = toObject(bytes);
214                 if (data instanceof List) {
215                     state.addAll((List<?>) data);
216                 }
217             } catch (Exception e) {
218                 e.printStackTrace();
219             }
220         }
221
222         @Override protected void createSnapshot() {
223             LOG.info("{}: createSnapshot called", persistenceId());
224             delegate.createSnapshot();
225         }
226
227         @Override protected void applySnapshot(byte [] snapshot) {
228             LOG.info("{}: applySnapshot called", persistenceId());
229             delegate.applySnapshot(snapshot);
230         }
231
232         @Override protected void onStateChanged() {
233             delegate.onStateChanged();
234         }
235
236         @Override
237         protected DataPersistenceProvider persistence() {
238             return this.dataPersistenceProvider;
239         }
240
241         @Override
242         protected Optional<ActorRef> getRoleChangeNotifier() {
243             return Optional.fromNullable(roleChangeNotifier);
244         }
245
246         @Override public String persistenceId() {
247             return this.getId();
248         }
249
250         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
251             Object obj = null;
252             ByteArrayInputStream bis = null;
253             ObjectInputStream ois = null;
254             try {
255                 bis = new ByteArrayInputStream(bs);
256                 ois = new ObjectInputStream(bis);
257                 obj = ois.readObject();
258             } finally {
259                 if (bis != null) {
260                     bis.close();
261                 }
262                 if (ois != null) {
263                     ois.close();
264                 }
265             }
266             return obj;
267         }
268
269         public ReplicatedLog getReplicatedLog(){
270             return this.getRaftActorContext().getReplicatedLog();
271         }
272
273     }
274
275
276     public static class RaftActorTestKit extends JavaTestKit {
277         private final ActorRef raftActor;
278
279         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
280             super(actorSystem);
281
282             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
283                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
284
285         }
286
287
288         public ActorRef getRaftActor() {
289             return raftActor;
290         }
291
292         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
293             // Wait for a specific log message to show up
294             return
295                 new JavaTestKit.EventFilter<Boolean>(logEventClass
296                 ) {
297                     @Override
298                     protected Boolean run() {
299                         return true;
300                     }
301                 }.from(raftActor.path().toString())
302                     .message(message)
303                     .occurrences(1).exec();
304
305
306         }
307
308         protected void waitUntilLeader(){
309             waitUntilLeader(raftActor);
310         }
311
312         public static void waitUntilLeader(ActorRef actorRef) {
313             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
314             for(int i = 0; i < 20 * 5; i++) {
315                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
316                 try {
317                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
318                     if(resp.getLeaderActor() != null) {
319                         return;
320                     }
321                 } catch(TimeoutException e) {
322                 } catch(Exception e) {
323                     System.err.println("FindLeader threw ex");
324                     e.printStackTrace();
325                 }
326
327
328                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
329             }
330
331             Assert.fail("Leader not found for actorRef " + actorRef.path());
332         }
333
334     }
335
336
337     @Test
338     public void testConstruction() {
339         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
340     }
341
342     @Test
343     public void testFindLeaderWhenLeaderIsSelf(){
344         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
345         kit.waitUntilLeader();
346     }
347
348     @Test
349     public void testRaftActorRecovery() throws Exception {
350         new JavaTestKit(getSystem()) {{
351             String persistenceId = factory.generateActorId("follower-");
352
353             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
354             // Set the heartbeat interval high to essentially disable election otherwise the test
355             // may fail if the actor is switched to Leader and the commitIndex is set to the last
356             // log entry.
357             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
358
359             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
360                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
361
362             watch(followerActor);
363
364             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
365             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
366                     new MockRaftActorContext.MockPayload("E"));
367             snapshotUnappliedEntries.add(entry1);
368
369             int lastAppliedDuringSnapshotCapture = 3;
370             int lastIndexDuringSnapshotCapture = 4;
371
372             // 4 messages as part of snapshot, which are applied to state
373             ByteString snapshotBytes = fromObject(Arrays.asList(
374                     new MockRaftActorContext.MockPayload("A"),
375                     new MockRaftActorContext.MockPayload("B"),
376                     new MockRaftActorContext.MockPayload("C"),
377                     new MockRaftActorContext.MockPayload("D")));
378
379             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
380                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
381                     lastAppliedDuringSnapshotCapture, 1);
382             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
383
384             // add more entries after snapshot is taken
385             List<ReplicatedLogEntry> entries = new ArrayList<>();
386             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
387                     new MockRaftActorContext.MockPayload("F"));
388             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
389                     new MockRaftActorContext.MockPayload("G"));
390             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
391                     new MockRaftActorContext.MockPayload("H"));
392             entries.add(entry2);
393             entries.add(entry3);
394             entries.add(entry4);
395
396             int lastAppliedToState = 5;
397             int lastIndex = 7;
398
399             InMemoryJournal.addEntry(persistenceId, 5, entry2);
400             // 2 entries are applied to state besides the 4 entries in snapshot
401             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
402             InMemoryJournal.addEntry(persistenceId, 7, entry3);
403             InMemoryJournal.addEntry(persistenceId, 8, entry4);
404
405             // kill the actor
406             followerActor.tell(PoisonPill.getInstance(), null);
407             expectMsgClass(duration("5 seconds"), Terminated.class);
408
409             unwatch(followerActor);
410
411             //reinstate the actor
412             TestActorRef<MockRaftActor> ref = factory.createTestActor(
413                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
414                             Optional.<ConfigParams>of(config)));
415
416             ref.underlyingActor().waitForRecoveryComplete();
417
418             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
419             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
420                     context.getReplicatedLog().size());
421             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
422             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
423             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
424             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
425         }};
426     }
427
428     @Test
429     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
430         new JavaTestKit(getSystem()) {{
431             String persistenceId = factory.generateActorId("leader-");
432
433             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
434             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
435
436             // Setup the persisted journal with some entries
437             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
438                     new MockRaftActorContext.MockPayload("zero"));
439             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
440                     new MockRaftActorContext.MockPayload("oen"));
441             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
442                     new MockRaftActorContext.MockPayload("two"));
443
444             long seqNr = 1;
445             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
446             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
447             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
448             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
449
450             int lastAppliedToState = 1;
451             int lastIndex = 2;
452
453             //reinstate the actor
454             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
455                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
456                             Optional.<ConfigParams>of(config)));
457
458             leaderActor.underlyingActor().waitForRecoveryComplete();
459
460             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
461             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
462             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
463             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
464             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
465         }};
466     }
467
468     /**
469      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
470      * process recovery messages
471      *
472      * @throws Exception
473      */
474
475     @Test
476     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
477         new JavaTestKit(getSystem()) {
478             {
479                 String persistenceId = factory.generateActorId("leader-");
480
481                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
482
483                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
484
485                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
486                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
487
488                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
489
490                 // Wait for akka's recovery to complete so it doesn't interfere.
491                 mockRaftActor.waitForRecoveryComplete();
492
493                 ByteString snapshotBytes = fromObject(Arrays.asList(
494                         new MockRaftActorContext.MockPayload("A"),
495                         new MockRaftActorContext.MockPayload("B"),
496                         new MockRaftActorContext.MockPayload("C"),
497                         new MockRaftActorContext.MockPayload("D")));
498
499                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
500                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
501
502                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
503
504                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
505
506                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
507
508                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
509
510                 assertEquals("add replicated log entry", 1, replicatedLog.size());
511
512                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
513
514                 assertEquals("add replicated log entry", 2, replicatedLog.size());
515
516                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
517
518                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
519
520                 // The snapshot had 4 items + we added 2 more items during the test
521                 // We start removing from 5 and we should get 1 item in the replicated log
522                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
523
524                 assertEquals("remove log entries", 1, replicatedLog.size());
525
526                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
527
528                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
529                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
530
531                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
532
533             }};
534     }
535
536     /**
537      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
538      * not process recovery messages
539      *
540      * @throws Exception
541      */
542     @Test
543     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
544         new JavaTestKit(getSystem()) {
545             {
546                 String persistenceId = factory.generateActorId("leader-");
547
548                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
549
550                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
551
552                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
553                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
554
555                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
556
557                 // Wait for akka's recovery to complete so it doesn't interfere.
558                 mockRaftActor.waitForRecoveryComplete();
559
560                 ByteString snapshotBytes = fromObject(Arrays.asList(
561                         new MockRaftActorContext.MockPayload("A"),
562                         new MockRaftActorContext.MockPayload("B"),
563                         new MockRaftActorContext.MockPayload("C"),
564                         new MockRaftActorContext.MockPayload("D")));
565
566                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
567                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
568
569                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
570
571                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
572
573                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
574
575                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
576
577                 assertEquals("add replicated log entry", 0, replicatedLog.size());
578
579                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
580
581                 assertEquals("add replicated log entry", 0, replicatedLog.size());
582
583                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
584
585                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
586
587                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
588
589                 assertEquals("remove log entries", 0, replicatedLog.size());
590
591                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
592
593                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
594                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
595
596                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
597             }};
598     }
599
600
601     @Test
602     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
603         new JavaTestKit(getSystem()) {
604             {
605                 String persistenceId = factory.generateActorId("leader-");
606
607                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
608
609                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
610
611                 CountDownLatch persistLatch = new CountDownLatch(1);
612                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
613                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
614
615                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
616                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
617
618                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
619
620                 mockRaftActor.waitForInitializeBehaviorComplete();
621
622                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
623
624                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
625             }
626         };
627     }
628
629     @Test
630     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
631         new JavaTestKit(getSystem()) {
632             {
633                 String persistenceId = factory.generateActorId("leader-");
634
635                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
636
637                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
638
639                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
640
641                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
642                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
643
644                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
645
646                 mockRaftActor.waitForInitializeBehaviorComplete();
647
648                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
649
650                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
651
652                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
653             }
654         };
655     }
656
657     @Test
658     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
659         new JavaTestKit(getSystem()) {
660             {
661                 String persistenceId = factory.generateActorId("leader-");
662
663                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
664
665                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
666
667                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
668
669                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
670                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
671
672                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
673
674                 mockRaftActor.waitForInitializeBehaviorComplete();
675
676                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
677
678                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
679
680                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
681             }
682         };
683     }
684
685     @Test
686     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
687         new JavaTestKit(getSystem()) {
688             {
689                 String persistenceId = factory.generateActorId("leader-");
690
691                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
692
693                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
694
695                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
696
697                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
698                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
699
700                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
701
702                 mockRaftActor.waitForInitializeBehaviorComplete();
703
704                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
705
706                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
707
708             }
709
710         };
711     }
712
713     @Test
714     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
715         new JavaTestKit(getSystem()) {
716             {
717                 String persistenceId = factory.generateActorId("leader-");
718
719                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
720
721                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
722
723                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
724
725                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
726                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
727                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
728
729                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
730
731                 mockRaftActor.waitForInitializeBehaviorComplete();
732
733                 ByteString snapshotBytes = fromObject(Arrays.asList(
734                         new MockRaftActorContext.MockPayload("A"),
735                         new MockRaftActorContext.MockPayload("B"),
736                         new MockRaftActorContext.MockPayload("C"),
737                         new MockRaftActorContext.MockPayload("D")));
738
739                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
740
741                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
742
743                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
744
745                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
746
747                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
748
749             }
750         };
751     }
752
753     @Test
754     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
755         new JavaTestKit(getSystem()) {
756             {
757                 String persistenceId = factory.generateActorId("leader-");
758
759                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
760
761                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
762
763                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
764
765                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
766                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
767
768                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
769
770                 mockRaftActor.waitForInitializeBehaviorComplete();
771
772                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
773                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
774                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
775                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
776                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
777
778                 ByteString snapshotBytes = fromObject(Arrays.asList(
779                         new MockRaftActorContext.MockPayload("A"),
780                         new MockRaftActorContext.MockPayload("B"),
781                         new MockRaftActorContext.MockPayload("C"),
782                         new MockRaftActorContext.MockPayload("D")));
783
784                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
785                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
786
787                 long replicatedToAllIndex = 1;
788                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
789
790                 verify(mockRaftActor.delegate).createSnapshot();
791
792                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
793
794                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
795
796                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
797
798                 verify(dataPersistenceProvider).deleteMessages(100);
799
800                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
801                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
802
803                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
804                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
805                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
806
807                 // Index 2 will not be in the log because it was removed due to snapshotting
808                 assertNull(mockRaftActor.getReplicatedLog().get(1));
809                 assertNull(mockRaftActor.getReplicatedLog().get(0));
810
811             }
812         };
813     }
814
815     @Test
816     public void testApplyState() throws Exception {
817
818         new JavaTestKit(getSystem()) {
819             {
820                 String persistenceId = factory.generateActorId("leader-");
821
822                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
823
824                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
825
826                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
827
828                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
829                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
830
831                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
832
833                 mockRaftActor.waitForInitializeBehaviorComplete();
834
835                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
836                         new MockRaftActorContext.MockPayload("F"));
837
838                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
839
840                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
841
842             }
843         };
844     }
845
846     @Test
847     public void testApplySnapshot() throws Exception {
848         new JavaTestKit(getSystem()) {
849             {
850                 String persistenceId = factory.generateActorId("leader-");
851
852                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
853
854                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
855
856                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
857
858                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
859                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
860
861                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
862
863                 mockRaftActor.waitForInitializeBehaviorComplete();
864
865                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
866
867                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
868                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
869                 oldReplicatedLog.append(
870                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
871                                 mock(Payload.class)));
872
873                 ByteString snapshotBytes = fromObject(Arrays.asList(
874                         new MockRaftActorContext.MockPayload("A"),
875                         new MockRaftActorContext.MockPayload("B"),
876                         new MockRaftActorContext.MockPayload("C"),
877                         new MockRaftActorContext.MockPayload("D")));
878
879                 Snapshot snapshot = mock(Snapshot.class);
880
881                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
882
883                 doReturn(3L).when(snapshot).getLastAppliedIndex();
884
885                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
886
887                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
888
889                 assertTrue("The replicatedLog should have changed",
890                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
891
892                 assertEquals("lastApplied should be same as in the snapshot",
893                         (Long) 3L, mockRaftActor.getLastApplied());
894
895                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
896
897             }
898         };
899     }
900
901     @Test
902     public void testSaveSnapshotFailure() throws Exception {
903         new JavaTestKit(getSystem()) {
904             {
905                 String persistenceId = factory.generateActorId("leader-");
906
907                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
908
909                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
910
911                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
912
913                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
914                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
915
916                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
917
918                 mockRaftActor.waitForInitializeBehaviorComplete();
919
920                 ByteString snapshotBytes = fromObject(Arrays.asList(
921                         new MockRaftActorContext.MockPayload("A"),
922                         new MockRaftActorContext.MockPayload("B"),
923                         new MockRaftActorContext.MockPayload("C"),
924                         new MockRaftActorContext.MockPayload("D")));
925
926                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
927
928                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
929
930                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
931
932                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
933
934                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
935                         new Exception()));
936
937                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
938                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
939
940             }
941         };
942     }
943
944     @Test
945     public void testRaftRoleChangeNotifier() throws Exception {
946         new JavaTestKit(getSystem()) {{
947             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
948             MessageCollectorActor.waitUntilReady(notifierActor);
949
950             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
951             long heartBeatInterval = 100;
952             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
953             config.setElectionTimeoutFactor(1);
954
955             String persistenceId = factory.generateActorId("notifier-");
956
957             factory.createTestActor(MockRaftActor.props(persistenceId,
958                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
959
960             List<RoleChanged> matches =  null;
961             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
962                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
963                 assertNotNull(matches);
964                 if(matches.size() == 3) {
965                     break;
966                 }
967                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
968             }
969
970             assertEquals(3, matches.size());
971
972             // check if the notifier got a role change from null to Follower
973             RoleChanged raftRoleChanged = matches.get(0);
974             assertEquals(persistenceId, raftRoleChanged.getMemberId());
975             assertNull(raftRoleChanged.getOldRole());
976             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
977
978             // check if the notifier got a role change from Follower to Candidate
979             raftRoleChanged = matches.get(1);
980             assertEquals(persistenceId, raftRoleChanged.getMemberId());
981             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
982             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
983
984             // check if the notifier got a role change from Candidate to Leader
985             raftRoleChanged = matches.get(2);
986             assertEquals(persistenceId, raftRoleChanged.getMemberId());
987             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
988             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
989         }};
990     }
991
992     @Test
993     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
994         new JavaTestKit(getSystem()) {
995             {
996                 String persistenceId = factory.generateActorId("leader-");
997                 String follower1Id = factory.generateActorId("follower-");
998
999                 ActorRef followerActor1 =
1000                         factory.createActor(Props.create(MessageCollectorActor.class));
1001
1002                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1003                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1004                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1005
1006                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1007
1008                 Map<String, String> peerAddresses = new HashMap<>();
1009                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1010
1011                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1012                         MockRaftActor.props(persistenceId, peerAddresses,
1013                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1014
1015                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1016
1017                 leaderActor.getRaftActorContext().setCommitIndex(4);
1018                 leaderActor.getRaftActorContext().setLastApplied(4);
1019                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1020
1021                 leaderActor.waitForInitializeBehaviorComplete();
1022
1023                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1024
1025                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1026                 leaderActor.setCurrentBehavior(leader);
1027                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1028
1029                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1030                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1031
1032                 assertEquals(8, leaderActor.getReplicatedLog().size());
1033
1034                 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1035
1036                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1037                 verify(leaderActor.delegate).createSnapshot();
1038
1039                 assertEquals(8, leaderActor.getReplicatedLog().size());
1040
1041                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1042                 //fake snapshot on index 5
1043                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1044
1045                 assertEquals(8, leaderActor.getReplicatedLog().size());
1046
1047                 //fake snapshot on index 6
1048                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1049                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1050                 assertEquals(8, leaderActor.getReplicatedLog().size());
1051
1052                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1053
1054                 assertEquals(8, leaderActor.getReplicatedLog().size());
1055
1056                 ByteString snapshotBytes = fromObject(Arrays.asList(
1057                         new MockRaftActorContext.MockPayload("foo-0"),
1058                         new MockRaftActorContext.MockPayload("foo-1"),
1059                         new MockRaftActorContext.MockPayload("foo-2"),
1060                         new MockRaftActorContext.MockPayload("foo-3"),
1061                         new MockRaftActorContext.MockPayload("foo-4")));
1062                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1063                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1064
1065                 // capture snapshot reply should remove the snapshotted entries only
1066                 assertEquals(3, leaderActor.getReplicatedLog().size());
1067                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1068
1069                 // add another non-replicated entry
1070                 leaderActor.getReplicatedLog().append(
1071                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1072
1073                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1074                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1075                 assertEquals(2, leaderActor.getReplicatedLog().size());
1076                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1077
1078             }
1079         };
1080     }
1081
1082     @Test
1083     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1084         new JavaTestKit(getSystem()) {
1085             {
1086                 String persistenceId = factory.generateActorId("follower-");
1087                 String leaderId = factory.generateActorId("leader-");
1088
1089
1090                 ActorRef leaderActor1 =
1091                         factory.createActor(Props.create(MessageCollectorActor.class));
1092
1093                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1094                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1095                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1096
1097                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1098
1099                 Map<String, String> peerAddresses = new HashMap<>();
1100                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1101
1102                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1103                         MockRaftActor.props(persistenceId, peerAddresses,
1104                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1105
1106                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1107                 followerActor.getRaftActorContext().setCommitIndex(4);
1108                 followerActor.getRaftActorContext().setLastApplied(4);
1109                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1110
1111                 followerActor.waitForInitializeBehaviorComplete();
1112
1113
1114                 Follower follower = new Follower(followerActor.getRaftActorContext());
1115                 followerActor.setCurrentBehavior(follower);
1116                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1117
1118                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1119                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1120                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1121
1122                 // log has indices 0-5
1123                 assertEquals(6, followerActor.getReplicatedLog().size());
1124
1125                 //snapshot on 4
1126                 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1127
1128                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1129                 verify(followerActor.delegate).createSnapshot();
1130
1131                 assertEquals(6, followerActor.getReplicatedLog().size());
1132
1133                 //fake snapshot on index 6
1134                 List<ReplicatedLogEntry> entries =
1135                         Arrays.asList(
1136                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1137                                         new MockRaftActorContext.MockPayload("foo-6"))
1138                         );
1139                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1140                 assertEquals(7, followerActor.getReplicatedLog().size());
1141
1142                 //fake snapshot on index 7
1143                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1144
1145                 entries =
1146                         Arrays.asList(
1147                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1148                                         new MockRaftActorContext.MockPayload("foo-7"))
1149                         );
1150                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1151                 assertEquals(8, followerActor.getReplicatedLog().size());
1152
1153                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1154
1155
1156                 ByteString snapshotBytes = fromObject(Arrays.asList(
1157                         new MockRaftActorContext.MockPayload("foo-0"),
1158                         new MockRaftActorContext.MockPayload("foo-1"),
1159                         new MockRaftActorContext.MockPayload("foo-2"),
1160                         new MockRaftActorContext.MockPayload("foo-3"),
1161                         new MockRaftActorContext.MockPayload("foo-4")));
1162                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1163                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1164
1165                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1166                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1167                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1168
1169                 entries =
1170                         Arrays.asList(
1171                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1172                                         new MockRaftActorContext.MockPayload("foo-7"))
1173                         );
1174                 // send an additional entry 8 with leaderCommit = 7
1175                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1176
1177                 // 7 and 8, as lastapplied is 7
1178                 assertEquals(2, followerActor.getReplicatedLog().size());
1179
1180             }
1181         };
1182     }
1183
1184     @Test
1185     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1186         new JavaTestKit(getSystem()) {
1187             {
1188                 String persistenceId = factory.generateActorId("leader-");
1189                 String follower1Id = factory.generateActorId("follower-");
1190                 String follower2Id = factory.generateActorId("follower-");
1191
1192                 ActorRef followerActor1 =
1193                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1194                 ActorRef followerActor2 =
1195                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1196
1197                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1198                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1199                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1200
1201                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1202
1203                 Map<String, String> peerAddresses = new HashMap<>();
1204                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1205                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1206
1207                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1208                         MockRaftActor.props(persistenceId, peerAddresses,
1209                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1210
1211                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1212                 leaderActor.getRaftActorContext().setCommitIndex(9);
1213                 leaderActor.getRaftActorContext().setLastApplied(9);
1214                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1215
1216                 leaderActor.waitForInitializeBehaviorComplete();
1217
1218                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1219                 leaderActor.setCurrentBehavior(leader);
1220                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1221
1222                 // create 5 entries in the log
1223                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1224                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1225
1226                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1227                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1228                 //setting replicatedToAllIndex = 9, for the log to clear
1229                 leader.setReplicatedToAllIndex(9);
1230                 assertEquals(5, leaderActor.getReplicatedLog().size());
1231                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1232
1233                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1234                 assertEquals(5, leaderActor.getReplicatedLog().size());
1235                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1236
1237                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1238                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1239                 assertEquals(5, leaderActor.getReplicatedLog().size());
1240                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1241
1242                 // simulate a real snapshot
1243                 leaderActor.onReceiveCommand(new SendHeartBeat());
1244                 assertEquals(5, leaderActor.getReplicatedLog().size());
1245                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1246                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1247                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1248
1249
1250                 //reply from a slow follower does not initiate a fake snapshot
1251                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1252                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1253
1254                 ByteString snapshotBytes = fromObject(Arrays.asList(
1255                         new MockRaftActorContext.MockPayload("foo-0"),
1256                         new MockRaftActorContext.MockPayload("foo-1"),
1257                         new MockRaftActorContext.MockPayload("foo-2"),
1258                         new MockRaftActorContext.MockPayload("foo-3"),
1259                         new MockRaftActorContext.MockPayload("foo-4")));
1260                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1261                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1262
1263                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1264
1265                 //reply from a slow follower after should not raise errors
1266                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1267                 assertEquals(0, leaderActor.getReplicatedLog().size());
1268             }
1269         };
1270     }
1271
1272
1273     private static class NonPersistentProvider implements DataPersistenceProvider {
1274         @Override
1275         public boolean isRecoveryApplicable() {
1276             return false;
1277         }
1278
1279         @Override
1280         public <T> void persist(T o, Procedure<T> procedure) {
1281             try {
1282                 procedure.apply(o);
1283             } catch (Exception e) {
1284                 e.printStackTrace();
1285             }
1286         }
1287
1288         @Override
1289         public void saveSnapshot(Object o) {
1290
1291         }
1292
1293         @Override
1294         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1295
1296         }
1297
1298         @Override
1299         public void deleteMessages(long sequenceNumber) {
1300
1301         }
1302     }
1303
1304     @Test
1305     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1306         new JavaTestKit(getSystem()) {{
1307             String persistenceId = factory.generateActorId("leader-");
1308             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1309             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1310             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1311             config.setSnapshotBatchCount(5);
1312
1313             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1314
1315             Map<String, String> peerAddresses = new HashMap<>();
1316
1317             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1318                     MockRaftActor.props(persistenceId, peerAddresses,
1319                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1320
1321             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1322             leaderActor.getRaftActorContext().setCommitIndex(3);
1323             leaderActor.getRaftActorContext().setLastApplied(3);
1324             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1325
1326             leaderActor.waitForInitializeBehaviorComplete();
1327             for(int i=0;i< 4;i++) {
1328                 leaderActor.getReplicatedLog()
1329                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1330                                 new MockRaftActorContext.MockPayload("A")));
1331             }
1332
1333             Leader leader = new Leader(leaderActor.getRaftActorContext());
1334             leaderActor.setCurrentBehavior(leader);
1335             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1336
1337             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1338             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1339
1340             // Now send a CaptureSnapshotReply
1341             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1342
1343             // Trimming log in this scenario is a no-op
1344             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1345             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1346             assertEquals(-1, leader.getReplicatedToAllIndex());
1347
1348         }};
1349     }
1350
1351     @Test
1352     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1353         new JavaTestKit(getSystem()) {{
1354             String persistenceId = factory.generateActorId("leader-");
1355             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1356             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1357             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1358             config.setSnapshotBatchCount(5);
1359
1360             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1361
1362             Map<String, String> peerAddresses = new HashMap<>();
1363
1364             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1365                     MockRaftActor.props(persistenceId, peerAddresses,
1366                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1367
1368             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1369             leaderActor.getRaftActorContext().setCommitIndex(3);
1370             leaderActor.getRaftActorContext().setLastApplied(3);
1371             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1372             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1373
1374             leaderActor.waitForInitializeBehaviorComplete();
1375             Leader leader = new Leader(leaderActor.getRaftActorContext());
1376             leaderActor.setCurrentBehavior(leader);
1377             leader.setReplicatedToAllIndex(3);
1378             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1379
1380             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1381             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1382
1383             // Now send a CaptureSnapshotReply
1384             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1385
1386             // Trimming log in this scenario is a no-op
1387             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1388             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1389             assertEquals(3, leader.getReplicatedToAllIndex());
1390
1391         }};
1392     }
1393
1394     private ByteString fromObject(Object snapshot) throws Exception {
1395         ByteArrayOutputStream b = null;
1396         ObjectOutputStream o = null;
1397         try {
1398             b = new ByteArrayOutputStream();
1399             o = new ObjectOutputStream(b);
1400             o.writeObject(snapshot);
1401             byte[] snapshotBytes = b.toByteArray();
1402             return ByteString.copyFrom(snapshotBytes);
1403         } finally {
1404             if (o != null) {
1405                 o.flush();
1406                 o.close();
1407             }
1408             if (b != null) {
1409                 b.close();
1410             }
1411         }
1412     }
1413
1414 }