Refactor InMemoryJournal and InMemorySnapshot to sal-akka-raft
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.RoleChanged;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
62 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
68 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.Duration;
78 import scala.concurrent.duration.FiniteDuration;
79
80 public class RaftActorTest extends AbstractActorTest {
81
82     private TestActorFactory factory;
83
84     @Before
85     public void setUp(){
86         factory = new TestActorFactory(getSystem());
87     }
88
89     @After
90     public void tearDown() throws Exception {
91         factory.close();
92         InMemoryJournal.clear();
93         InMemorySnapshotStore.clear();
94     }
95
96     public static class MockRaftActor extends RaftActor {
97
98         private final DataPersistenceProvider dataPersistenceProvider;
99         private final RaftActor delegate;
100         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
101         private final List<Object> state;
102         private ActorRef roleChangeNotifier;
103         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
104
105         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
106             private static final long serialVersionUID = 1L;
107             private final Map<String, String> peerAddresses;
108             private final String id;
109             private final Optional<ConfigParams> config;
110             private final DataPersistenceProvider dataPersistenceProvider;
111             private final ActorRef roleChangeNotifier;
112
113             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
114                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
115                 ActorRef roleChangeNotifier) {
116                 this.peerAddresses = peerAddresses;
117                 this.id = id;
118                 this.config = config;
119                 this.dataPersistenceProvider = dataPersistenceProvider;
120                 this.roleChangeNotifier = roleChangeNotifier;
121             }
122
123             @Override
124             public MockRaftActor create() throws Exception {
125                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
126                     dataPersistenceProvider);
127                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
128                 return mockRaftActor;
129             }
130         }
131
132         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
133                              DataPersistenceProvider dataPersistenceProvider) {
134             super(id, peerAddresses, config);
135             state = new ArrayList<>();
136             this.delegate = mock(RaftActor.class);
137             if(dataPersistenceProvider == null){
138                 this.dataPersistenceProvider = new PersistentDataProvider();
139             } else {
140                 this.dataPersistenceProvider = dataPersistenceProvider;
141             }
142         }
143
144         public void waitForRecoveryComplete() {
145             try {
146                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
147             } catch (InterruptedException e) {
148                 e.printStackTrace();
149             }
150         }
151
152         public void waitForInitializeBehaviorComplete() {
153             try {
154                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
155             } catch (InterruptedException e) {
156                 e.printStackTrace();
157             }
158         }
159
160         public List<Object> getState() {
161             return state;
162         }
163
164         public static Props props(final String id, final Map<String, String> peerAddresses,
165                 Optional<ConfigParams> config){
166             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
167         }
168
169         public static Props props(final String id, final Map<String, String> peerAddresses,
170                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
171             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
172         }
173
174         public static Props props(final String id, final Map<String, String> peerAddresses,
175             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
176             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
177         }
178
179         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
180             delegate.applyState(clientActor, identifier, data);
181             LOG.info("applyState called");
182         }
183
184         @Override
185         protected void startLogRecoveryBatch(int maxBatchSize) {
186         }
187
188         @Override
189         protected void appendRecoveredLogEntry(Payload data) {
190             state.add(data);
191         }
192
193         @Override
194         protected void applyCurrentLogRecoveryBatch() {
195         }
196
197         @Override
198         protected void onRecoveryComplete() {
199             delegate.onRecoveryComplete();
200             recoveryComplete.countDown();
201         }
202
203         @Override
204         protected void initializeBehavior() {
205             super.initializeBehavior();
206             initializeBehaviorComplete.countDown();
207         }
208
209         @Override
210         protected void applyRecoverySnapshot(byte[] bytes) {
211             delegate.applyRecoverySnapshot(bytes);
212             try {
213                 Object data = toObject(bytes);
214                 if (data instanceof List) {
215                     state.addAll((List<?>) data);
216                 }
217             } catch (Exception e) {
218                 e.printStackTrace();
219             }
220         }
221
222         @Override protected void createSnapshot() {
223             delegate.createSnapshot();
224         }
225
226         @Override protected void applySnapshot(byte [] snapshot) {
227             delegate.applySnapshot(snapshot);
228         }
229
230         @Override protected void onStateChanged() {
231             delegate.onStateChanged();
232         }
233
234         @Override
235         protected DataPersistenceProvider persistence() {
236             return this.dataPersistenceProvider;
237         }
238
239         @Override
240         protected Optional<ActorRef> getRoleChangeNotifier() {
241             return Optional.fromNullable(roleChangeNotifier);
242         }
243
244         @Override public String persistenceId() {
245             return this.getId();
246         }
247
248         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
249             Object obj = null;
250             ByteArrayInputStream bis = null;
251             ObjectInputStream ois = null;
252             try {
253                 bis = new ByteArrayInputStream(bs);
254                 ois = new ObjectInputStream(bis);
255                 obj = ois.readObject();
256             } finally {
257                 if (bis != null) {
258                     bis.close();
259                 }
260                 if (ois != null) {
261                     ois.close();
262                 }
263             }
264             return obj;
265         }
266
267         public ReplicatedLog getReplicatedLog(){
268             return this.getRaftActorContext().getReplicatedLog();
269         }
270
271     }
272
273
274     private static class RaftActorTestKit extends JavaTestKit {
275         private final ActorRef raftActor;
276
277         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
278             super(actorSystem);
279
280             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
281                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
282
283         }
284
285
286         public ActorRef getRaftActor() {
287             return raftActor;
288         }
289
290         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
291             // Wait for a specific log message to show up
292             return
293                 new JavaTestKit.EventFilter<Boolean>(logEventClass
294                 ) {
295                     @Override
296                     protected Boolean run() {
297                         return true;
298                     }
299                 }.from(raftActor.path().toString())
300                     .message(message)
301                     .occurrences(1).exec();
302
303
304         }
305
306         protected void waitUntilLeader(){
307             waitUntilLeader(raftActor);
308         }
309
310         protected void waitUntilLeader(ActorRef actorRef) {
311             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
312             for(int i = 0; i < 20 * 5; i++) {
313                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
314                 try {
315                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
316                     if(resp.getLeaderActor() != null) {
317                         return;
318                     }
319                 } catch(TimeoutException e) {
320                 } catch(Exception e) {
321                     System.err.println("FindLeader threw ex");
322                     e.printStackTrace();
323                 }
324
325
326                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
327             }
328
329             Assert.fail("Leader not found for actorRef " + actorRef.path());
330         }
331
332     }
333
334
335     @Test
336     public void testConstruction() {
337         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
338     }
339
340     @Test
341     public void testFindLeaderWhenLeaderIsSelf(){
342         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
343         kit.waitUntilLeader();
344     }
345
346     @Test
347     public void testRaftActorRecovery() throws Exception {
348         new JavaTestKit(getSystem()) {{
349             String persistenceId = factory.generateActorId("follower-");
350
351             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
352             // Set the heartbeat interval high to essentially disable election otherwise the test
353             // may fail if the actor is switched to Leader and the commitIndex is set to the last
354             // log entry.
355             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
356
357             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
358                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
359
360             watch(followerActor);
361
362             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
363             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
364                     new MockRaftActorContext.MockPayload("E"));
365             snapshotUnappliedEntries.add(entry1);
366
367             int lastAppliedDuringSnapshotCapture = 3;
368             int lastIndexDuringSnapshotCapture = 4;
369
370             // 4 messages as part of snapshot, which are applied to state
371             ByteString snapshotBytes = fromObject(Arrays.asList(
372                     new MockRaftActorContext.MockPayload("A"),
373                     new MockRaftActorContext.MockPayload("B"),
374                     new MockRaftActorContext.MockPayload("C"),
375                     new MockRaftActorContext.MockPayload("D")));
376
377             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
378                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
379                     lastAppliedDuringSnapshotCapture, 1);
380             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
381
382             // add more entries after snapshot is taken
383             List<ReplicatedLogEntry> entries = new ArrayList<>();
384             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
385                     new MockRaftActorContext.MockPayload("F"));
386             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
387                     new MockRaftActorContext.MockPayload("G"));
388             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
389                     new MockRaftActorContext.MockPayload("H"));
390             entries.add(entry2);
391             entries.add(entry3);
392             entries.add(entry4);
393
394             int lastAppliedToState = 5;
395             int lastIndex = 7;
396
397             InMemoryJournal.addEntry(persistenceId, 5, entry2);
398             // 2 entries are applied to state besides the 4 entries in snapshot
399             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
400             InMemoryJournal.addEntry(persistenceId, 7, entry3);
401             InMemoryJournal.addEntry(persistenceId, 8, entry4);
402
403             // kill the actor
404             followerActor.tell(PoisonPill.getInstance(), null);
405             expectMsgClass(duration("5 seconds"), Terminated.class);
406
407             unwatch(followerActor);
408
409             //reinstate the actor
410             TestActorRef<MockRaftActor> ref = factory.createTestActor(
411                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
412                             Optional.<ConfigParams>of(config)));
413
414             ref.underlyingActor().waitForRecoveryComplete();
415
416             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
417             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
418                     context.getReplicatedLog().size());
419             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
420             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
421             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
422             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
423         }};
424     }
425
426     @Test
427     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
428         new JavaTestKit(getSystem()) {{
429             String persistenceId = factory.generateActorId("leader-");
430
431             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
432             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
433
434             // Setup the persisted journal with some entries
435             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
436                     new MockRaftActorContext.MockPayload("zero"));
437             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
438                     new MockRaftActorContext.MockPayload("oen"));
439             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
440                     new MockRaftActorContext.MockPayload("two"));
441
442             long seqNr = 1;
443             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
444             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
445             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
446             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
447
448             int lastAppliedToState = 1;
449             int lastIndex = 2;
450
451             //reinstate the actor
452             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
453                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
454                             Optional.<ConfigParams>of(config)));
455
456             leaderActor.underlyingActor().waitForRecoveryComplete();
457
458             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
459             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
460             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
461             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
462             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
463         }};
464     }
465
466     /**
467      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
468      * process recovery messages
469      *
470      * @throws Exception
471      */
472
473     @Test
474     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
475         new JavaTestKit(getSystem()) {
476             {
477                 String persistenceId = factory.generateActorId("leader-");
478
479                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
480
481                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
482
483                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
484                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
485
486                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
487
488                 // Wait for akka's recovery to complete so it doesn't interfere.
489                 mockRaftActor.waitForRecoveryComplete();
490
491                 ByteString snapshotBytes = fromObject(Arrays.asList(
492                         new MockRaftActorContext.MockPayload("A"),
493                         new MockRaftActorContext.MockPayload("B"),
494                         new MockRaftActorContext.MockPayload("C"),
495                         new MockRaftActorContext.MockPayload("D")));
496
497                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
498                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
499
500                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
501
502                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
503
504                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
505
506                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
507
508                 assertEquals("add replicated log entry", 1, replicatedLog.size());
509
510                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
511
512                 assertEquals("add replicated log entry", 2, replicatedLog.size());
513
514                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
515
516                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
517
518                 // The snapshot had 4 items + we added 2 more items during the test
519                 // We start removing from 5 and we should get 1 item in the replicated log
520                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
521
522                 assertEquals("remove log entries", 1, replicatedLog.size());
523
524                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
525
526                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
527                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
528
529                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
530
531             }};
532     }
533
534     /**
535      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
536      * not process recovery messages
537      *
538      * @throws Exception
539      */
540     @Test
541     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
542         new JavaTestKit(getSystem()) {
543             {
544                 String persistenceId = factory.generateActorId("leader-");
545
546                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
547
548                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
549
550                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
551                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
552
553                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
554
555                 // Wait for akka's recovery to complete so it doesn't interfere.
556                 mockRaftActor.waitForRecoveryComplete();
557
558                 ByteString snapshotBytes = fromObject(Arrays.asList(
559                         new MockRaftActorContext.MockPayload("A"),
560                         new MockRaftActorContext.MockPayload("B"),
561                         new MockRaftActorContext.MockPayload("C"),
562                         new MockRaftActorContext.MockPayload("D")));
563
564                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
565                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
566
567                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
568
569                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
570
571                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
572
573                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
574
575                 assertEquals("add replicated log entry", 0, replicatedLog.size());
576
577                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
578
579                 assertEquals("add replicated log entry", 0, replicatedLog.size());
580
581                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
582
583                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
584
585                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
586
587                 assertEquals("remove log entries", 0, replicatedLog.size());
588
589                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
590
591                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
592                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
593
594                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
595             }};
596     }
597
598
599     @Test
600     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
601         new JavaTestKit(getSystem()) {
602             {
603                 String persistenceId = factory.generateActorId("leader-");
604
605                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
606
607                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
608
609                 CountDownLatch persistLatch = new CountDownLatch(1);
610                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
611                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
612
613                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
614                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
615
616                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
617
618                 mockRaftActor.waitForInitializeBehaviorComplete();
619
620                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
621
622                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
623             }
624         };
625     }
626
627     @Test
628     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
629         new JavaTestKit(getSystem()) {
630             {
631                 String persistenceId = factory.generateActorId("leader-");
632
633                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
634
635                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
636
637                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
638
639                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
640                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
641
642                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
643
644                 mockRaftActor.waitForInitializeBehaviorComplete();
645
646                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
647
648                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
649
650                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
651             }
652         };
653     }
654
655     @Test
656     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
657         new JavaTestKit(getSystem()) {
658             {
659                 String persistenceId = factory.generateActorId("leader-");
660
661                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
662
663                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
664
665                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
666
667                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
668                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
669
670                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
671
672                 mockRaftActor.waitForInitializeBehaviorComplete();
673
674                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
675
676                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
677
678                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
679             }
680         };
681     }
682
683     @Test
684     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
685         new JavaTestKit(getSystem()) {
686             {
687                 String persistenceId = factory.generateActorId("leader-");
688
689                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
690
691                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
692
693                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
694
695                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
696                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
697
698                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
699
700                 mockRaftActor.waitForInitializeBehaviorComplete();
701
702                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
703
704                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
705
706             }
707
708         };
709     }
710
711     @Test
712     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
713         new JavaTestKit(getSystem()) {
714             {
715                 String persistenceId = factory.generateActorId("leader-");
716
717                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
718
719                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
720
721                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
722
723                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
724                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
725                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
726
727                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
728
729                 mockRaftActor.waitForInitializeBehaviorComplete();
730
731                 ByteString snapshotBytes = fromObject(Arrays.asList(
732                         new MockRaftActorContext.MockPayload("A"),
733                         new MockRaftActorContext.MockPayload("B"),
734                         new MockRaftActorContext.MockPayload("C"),
735                         new MockRaftActorContext.MockPayload("D")));
736
737                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
738
739                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
740
741                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
742
743                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
744
745                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
746
747             }
748         };
749     }
750
751     @Test
752     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
753         new JavaTestKit(getSystem()) {
754             {
755                 String persistenceId = factory.generateActorId("leader-");
756
757                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
758
759                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760
761                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
762
763                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
764                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
765
766                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
767
768                 mockRaftActor.waitForInitializeBehaviorComplete();
769
770                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
771                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
772                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
773                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
774                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
775
776                 ByteString snapshotBytes = fromObject(Arrays.asList(
777                         new MockRaftActorContext.MockPayload("A"),
778                         new MockRaftActorContext.MockPayload("B"),
779                         new MockRaftActorContext.MockPayload("C"),
780                         new MockRaftActorContext.MockPayload("D")));
781
782                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
783                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
784
785                 long replicatedToAllIndex = 1;
786                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
787
788                 verify(mockRaftActor.delegate).createSnapshot();
789
790                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
791
792                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
793
794                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
795
796                 verify(dataPersistenceProvider).deleteMessages(100);
797
798                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
799                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
800
801                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
802                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
803                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
804
805                 // Index 2 will not be in the log because it was removed due to snapshotting
806                 assertNull(mockRaftActor.getReplicatedLog().get(1));
807                 assertNull(mockRaftActor.getReplicatedLog().get(0));
808
809             }
810         };
811     }
812
813     @Test
814     public void testApplyState() throws Exception {
815
816         new JavaTestKit(getSystem()) {
817             {
818                 String persistenceId = factory.generateActorId("leader-");
819
820                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
821
822                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
823
824                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
825
826                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
827                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
828
829                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
830
831                 mockRaftActor.waitForInitializeBehaviorComplete();
832
833                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
834                         new MockRaftActorContext.MockPayload("F"));
835
836                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
837
838                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
839
840             }
841         };
842     }
843
844     @Test
845     public void testApplySnapshot() throws Exception {
846         new JavaTestKit(getSystem()) {
847             {
848                 String persistenceId = factory.generateActorId("leader-");
849
850                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
851
852                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
853
854                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
855
856                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
857                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
858
859                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
860
861                 mockRaftActor.waitForInitializeBehaviorComplete();
862
863                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
864
865                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
866                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
867                 oldReplicatedLog.append(
868                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
869                                 mock(Payload.class)));
870
871                 ByteString snapshotBytes = fromObject(Arrays.asList(
872                         new MockRaftActorContext.MockPayload("A"),
873                         new MockRaftActorContext.MockPayload("B"),
874                         new MockRaftActorContext.MockPayload("C"),
875                         new MockRaftActorContext.MockPayload("D")));
876
877                 Snapshot snapshot = mock(Snapshot.class);
878
879                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
880
881                 doReturn(3L).when(snapshot).getLastAppliedIndex();
882
883                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
884
885                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
886
887                 assertTrue("The replicatedLog should have changed",
888                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
889
890                 assertEquals("lastApplied should be same as in the snapshot",
891                         (Long) 3L, mockRaftActor.getLastApplied());
892
893                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
894
895             }
896         };
897     }
898
899     @Test
900     public void testSaveSnapshotFailure() throws Exception {
901         new JavaTestKit(getSystem()) {
902             {
903                 String persistenceId = factory.generateActorId("leader-");
904
905                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
906
907                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
908
909                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
910
911                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
912                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
913
914                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
915
916                 mockRaftActor.waitForInitializeBehaviorComplete();
917
918                 ByteString snapshotBytes = fromObject(Arrays.asList(
919                         new MockRaftActorContext.MockPayload("A"),
920                         new MockRaftActorContext.MockPayload("B"),
921                         new MockRaftActorContext.MockPayload("C"),
922                         new MockRaftActorContext.MockPayload("D")));
923
924                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
925
926                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
927
928                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
929
930                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
931
932                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
933                         new Exception()));
934
935                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
936                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
937
938             }
939         };
940     }
941
942     @Test
943     public void testRaftRoleChangeNotifier() throws Exception {
944         new JavaTestKit(getSystem()) {{
945             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
946             MessageCollectorActor.waitUntilReady(notifierActor);
947
948             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
949             long heartBeatInterval = 100;
950             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
951             config.setElectionTimeoutFactor(1);
952
953             String persistenceId = factory.generateActorId("notifier-");
954
955             factory.createTestActor(MockRaftActor.props(persistenceId,
956                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
957
958             List<RoleChanged> matches =  null;
959             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
960                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
961                 assertNotNull(matches);
962                 if(matches.size() == 3) {
963                     break;
964                 }
965                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
966             }
967
968             assertEquals(3, matches.size());
969
970             // check if the notifier got a role change from null to Follower
971             RoleChanged raftRoleChanged = matches.get(0);
972             assertEquals(persistenceId, raftRoleChanged.getMemberId());
973             assertNull(raftRoleChanged.getOldRole());
974             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
975
976             // check if the notifier got a role change from Follower to Candidate
977             raftRoleChanged = matches.get(1);
978             assertEquals(persistenceId, raftRoleChanged.getMemberId());
979             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
980             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
981
982             // check if the notifier got a role change from Candidate to Leader
983             raftRoleChanged = matches.get(2);
984             assertEquals(persistenceId, raftRoleChanged.getMemberId());
985             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
986             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
987         }};
988     }
989
990     @Test
991     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
992         new JavaTestKit(getSystem()) {
993             {
994                 String persistenceId = factory.generateActorId("leader-");
995                 String follower1Id = factory.generateActorId("follower-");
996
997                 ActorRef followerActor1 =
998                         factory.createActor(Props.create(MessageCollectorActor.class));
999
1000                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1001                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1002                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1003
1004                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1005
1006                 Map<String, String> peerAddresses = new HashMap<>();
1007                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1008
1009                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1010                         MockRaftActor.props(persistenceId, peerAddresses,
1011                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1012
1013                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1014
1015                 leaderActor.getRaftActorContext().setCommitIndex(4);
1016                 leaderActor.getRaftActorContext().setLastApplied(4);
1017                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1018
1019                 leaderActor.waitForInitializeBehaviorComplete();
1020
1021                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1022
1023                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1024                 leaderActor.setCurrentBehavior(leader);
1025                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1026
1027                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1028                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1029
1030                 assertEquals(8, leaderActor.getReplicatedLog().size());
1031
1032                 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1033
1034                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1035                 verify(leaderActor.delegate).createSnapshot();
1036
1037                 assertEquals(8, leaderActor.getReplicatedLog().size());
1038
1039                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1040                 //fake snapshot on index 5
1041                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1042
1043                 assertEquals(8, leaderActor.getReplicatedLog().size());
1044
1045                 //fake snapshot on index 6
1046                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1047                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1048                 assertEquals(8, leaderActor.getReplicatedLog().size());
1049
1050                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1051
1052                 assertEquals(8, leaderActor.getReplicatedLog().size());
1053
1054                 ByteString snapshotBytes = fromObject(Arrays.asList(
1055                         new MockRaftActorContext.MockPayload("foo-0"),
1056                         new MockRaftActorContext.MockPayload("foo-1"),
1057                         new MockRaftActorContext.MockPayload("foo-2"),
1058                         new MockRaftActorContext.MockPayload("foo-3"),
1059                         new MockRaftActorContext.MockPayload("foo-4")));
1060                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1061                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1062
1063                 // capture snapshot reply should remove the snapshotted entries only
1064                 assertEquals(3, leaderActor.getReplicatedLog().size());
1065                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1066
1067                 // add another non-replicated entry
1068                 leaderActor.getReplicatedLog().append(
1069                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1070
1071                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1072                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1073                 assertEquals(2, leaderActor.getReplicatedLog().size());
1074                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1075
1076             }
1077         };
1078     }
1079
1080     @Test
1081     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1082         new JavaTestKit(getSystem()) {
1083             {
1084                 String persistenceId = factory.generateActorId("follower-");
1085                 String leaderId = factory.generateActorId("leader-");
1086
1087
1088                 ActorRef leaderActor1 =
1089                         factory.createActor(Props.create(MessageCollectorActor.class));
1090
1091                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1092                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1093                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1094
1095                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1096
1097                 Map<String, String> peerAddresses = new HashMap<>();
1098                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1099
1100                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1101                         MockRaftActor.props(persistenceId, peerAddresses,
1102                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1103
1104                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1105                 followerActor.getRaftActorContext().setCommitIndex(4);
1106                 followerActor.getRaftActorContext().setLastApplied(4);
1107                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1108
1109                 followerActor.waitForInitializeBehaviorComplete();
1110
1111
1112                 Follower follower = new Follower(followerActor.getRaftActorContext());
1113                 followerActor.setCurrentBehavior(follower);
1114                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1115
1116                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1117                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1118                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1119
1120                 // log has indices 0-5
1121                 assertEquals(6, followerActor.getReplicatedLog().size());
1122
1123                 //snapshot on 4
1124                 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1125
1126                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1127                 verify(followerActor.delegate).createSnapshot();
1128
1129                 assertEquals(6, followerActor.getReplicatedLog().size());
1130
1131                 //fake snapshot on index 6
1132                 List<ReplicatedLogEntry> entries =
1133                         Arrays.asList(
1134                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1135                                         new MockRaftActorContext.MockPayload("foo-6"))
1136                         );
1137                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1138                 assertEquals(7, followerActor.getReplicatedLog().size());
1139
1140                 //fake snapshot on index 7
1141                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1142
1143                 entries =
1144                         Arrays.asList(
1145                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1146                                         new MockRaftActorContext.MockPayload("foo-7"))
1147                         );
1148                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1149                 assertEquals(8, followerActor.getReplicatedLog().size());
1150
1151                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1152
1153
1154                 ByteString snapshotBytes = fromObject(Arrays.asList(
1155                         new MockRaftActorContext.MockPayload("foo-0"),
1156                         new MockRaftActorContext.MockPayload("foo-1"),
1157                         new MockRaftActorContext.MockPayload("foo-2"),
1158                         new MockRaftActorContext.MockPayload("foo-3"),
1159                         new MockRaftActorContext.MockPayload("foo-4")));
1160                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1161                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1162
1163                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1164                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1165                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1166
1167                 entries =
1168                         Arrays.asList(
1169                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1170                                         new MockRaftActorContext.MockPayload("foo-7"))
1171                         );
1172                 // send an additional entry 8 with leaderCommit = 7
1173                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1174
1175                 // 7 and 8, as lastapplied is 7
1176                 assertEquals(2, followerActor.getReplicatedLog().size());
1177
1178             }
1179         };
1180     }
1181
1182     @Test
1183     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1184         new JavaTestKit(getSystem()) {
1185             {
1186                 String persistenceId = factory.generateActorId("leader-");
1187                 String follower1Id = factory.generateActorId("follower-");
1188                 String follower2Id = factory.generateActorId("follower-");
1189
1190                 ActorRef followerActor1 =
1191                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1192                 ActorRef followerActor2 =
1193                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1194
1195                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1196                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1197                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1198
1199                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1200
1201                 Map<String, String> peerAddresses = new HashMap<>();
1202                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1203                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1204
1205                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1206                         MockRaftActor.props(persistenceId, peerAddresses,
1207                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1208
1209                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1210                 leaderActor.getRaftActorContext().setCommitIndex(9);
1211                 leaderActor.getRaftActorContext().setLastApplied(9);
1212                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1213
1214                 leaderActor.waitForInitializeBehaviorComplete();
1215
1216                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1217                 leaderActor.setCurrentBehavior(leader);
1218                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1219
1220                 // create 5 entries in the log
1221                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1222                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1223
1224                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1225                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1226                 //setting replicatedToAllIndex = 9, for the log to clear
1227                 leader.setReplicatedToAllIndex(9);
1228                 assertEquals(5, leaderActor.getReplicatedLog().size());
1229                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1230
1231                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1232                 assertEquals(5, leaderActor.getReplicatedLog().size());
1233                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1234
1235                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1236                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1237                 assertEquals(5, leaderActor.getReplicatedLog().size());
1238                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1239
1240                 // simulate a real snapshot
1241                 leaderActor.onReceiveCommand(new SendHeartBeat());
1242                 assertEquals(5, leaderActor.getReplicatedLog().size());
1243                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1244                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1245                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1246
1247
1248                 //reply from a slow follower does not initiate a fake snapshot
1249                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1250                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1251
1252                 ByteString snapshotBytes = fromObject(Arrays.asList(
1253                         new MockRaftActorContext.MockPayload("foo-0"),
1254                         new MockRaftActorContext.MockPayload("foo-1"),
1255                         new MockRaftActorContext.MockPayload("foo-2"),
1256                         new MockRaftActorContext.MockPayload("foo-3"),
1257                         new MockRaftActorContext.MockPayload("foo-4")));
1258                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1259                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1260
1261                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1262
1263                 //reply from a slow follower after should not raise errors
1264                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1265                 assertEquals(0, leaderActor.getReplicatedLog().size());
1266             }
1267         };
1268     }
1269
1270
1271     private static class NonPersistentProvider implements DataPersistenceProvider {
1272         @Override
1273         public boolean isRecoveryApplicable() {
1274             return false;
1275         }
1276
1277         @Override
1278         public <T> void persist(T o, Procedure<T> procedure) {
1279             try {
1280                 procedure.apply(o);
1281             } catch (Exception e) {
1282                 e.printStackTrace();
1283             }
1284         }
1285
1286         @Override
1287         public void saveSnapshot(Object o) {
1288
1289         }
1290
1291         @Override
1292         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1293
1294         }
1295
1296         @Override
1297         public void deleteMessages(long sequenceNumber) {
1298
1299         }
1300     }
1301
1302     @Test
1303     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1304         new JavaTestKit(getSystem()) {{
1305             String persistenceId = factory.generateActorId("leader-");
1306             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1307             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1308             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1309             config.setSnapshotBatchCount(5);
1310
1311             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1312
1313             Map<String, String> peerAddresses = new HashMap<>();
1314
1315             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1316                     MockRaftActor.props(persistenceId, peerAddresses,
1317                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1318
1319             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1320             leaderActor.getRaftActorContext().setCommitIndex(3);
1321             leaderActor.getRaftActorContext().setLastApplied(3);
1322             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1323
1324             leaderActor.waitForInitializeBehaviorComplete();
1325             for(int i=0;i< 4;i++) {
1326                 leaderActor.getReplicatedLog()
1327                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1328                                 new MockRaftActorContext.MockPayload("A")));
1329             }
1330
1331             Leader leader = new Leader(leaderActor.getRaftActorContext());
1332             leaderActor.setCurrentBehavior(leader);
1333             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1334
1335             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1336             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1337
1338             // Now send a CaptureSnapshotReply
1339             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1340
1341             // Trimming log in this scenario is a no-op
1342             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1343             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1344             assertEquals(-1, leader.getReplicatedToAllIndex());
1345
1346         }};
1347     }
1348
1349     @Test
1350     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1351         new JavaTestKit(getSystem()) {{
1352             String persistenceId = factory.generateActorId("leader-");
1353             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1354             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1355             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1356             config.setSnapshotBatchCount(5);
1357
1358             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1359
1360             Map<String, String> peerAddresses = new HashMap<>();
1361
1362             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1363                     MockRaftActor.props(persistenceId, peerAddresses,
1364                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1365
1366             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1367             leaderActor.getRaftActorContext().setCommitIndex(3);
1368             leaderActor.getRaftActorContext().setLastApplied(3);
1369             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1370             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1371
1372             leaderActor.waitForInitializeBehaviorComplete();
1373             Leader leader = new Leader(leaderActor.getRaftActorContext());
1374             leaderActor.setCurrentBehavior(leader);
1375             leader.setReplicatedToAllIndex(3);
1376             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1377
1378             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1379             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1380
1381             // Now send a CaptureSnapshotReply
1382             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1383
1384             // Trimming log in this scenario is a no-op
1385             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1386             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1387             assertEquals(3, leader.getReplicatedToAllIndex());
1388
1389         }};
1390     }
1391
1392     private ByteString fromObject(Object snapshot) throws Exception {
1393         ByteArrayOutputStream b = null;
1394         ObjectOutputStream o = null;
1395         try {
1396             b = new ByteArrayOutputStream();
1397             o = new ObjectOutputStream(b);
1398             o.writeObject(snapshot);
1399             byte[] snapshotBytes = b.toByteArray();
1400             return ByteString.copyFrom(snapshotBytes);
1401         } finally {
1402             if (o != null) {
1403                 o.flush();
1404                 o.close();
1405             }
1406             if (b != null) {
1407                 b.close();
1408             }
1409         }
1410     }
1411
1412 }