4cb555c4b547bb9cd3c796644da2f74352e75521
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
58 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RoleChanged;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
65 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
67 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
68 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
69 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
76 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
77 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
78 import scala.concurrent.Await;
79 import scala.concurrent.Future;
80 import scala.concurrent.duration.Duration;
81 import scala.concurrent.duration.FiniteDuration;
82
83 public class RaftActorTest extends AbstractActorTest {
84
85     private TestActorFactory factory;
86
87     @Before
88     public void setUp(){
89         factory = new TestActorFactory(getSystem());
90     }
91
92     @After
93     public void tearDown() throws Exception {
94         factory.close();
95         InMemoryJournal.clear();
96         InMemorySnapshotStore.clear();
97     }
98
99     public static class MockRaftActor extends RaftActor {
100
101         private final RaftActor delegate;
102         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103         private final List<Object> state;
104         private ActorRef roleChangeNotifier;
105         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
106
107         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108             private static final long serialVersionUID = 1L;
109             private final Map<String, String> peerAddresses;
110             private final String id;
111             private final Optional<ConfigParams> config;
112             private final DataPersistenceProvider dataPersistenceProvider;
113             private final ActorRef roleChangeNotifier;
114
115             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117                 ActorRef roleChangeNotifier) {
118                 this.peerAddresses = peerAddresses;
119                 this.id = id;
120                 this.config = config;
121                 this.dataPersistenceProvider = dataPersistenceProvider;
122                 this.roleChangeNotifier = roleChangeNotifier;
123             }
124
125             @Override
126             public MockRaftActor create() throws Exception {
127                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128                     dataPersistenceProvider);
129                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130                 return mockRaftActor;
131             }
132         }
133
134         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135                              DataPersistenceProvider dataPersistenceProvider) {
136             super(id, peerAddresses, config);
137             state = new ArrayList<>();
138             this.delegate = mock(RaftActor.class);
139             if(dataPersistenceProvider == null){
140                 setPersistence(true);
141             } else {
142                 setPersistence(dataPersistenceProvider);
143             }
144         }
145
146         public void waitForRecoveryComplete() {
147             try {
148                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
149             } catch (InterruptedException e) {
150                 e.printStackTrace();
151             }
152         }
153
154         public void waitForInitializeBehaviorComplete() {
155             try {
156                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
157             } catch (InterruptedException e) {
158                 e.printStackTrace();
159             }
160         }
161
162
163         public void waitUntilLeader(){
164             for(int i = 0;i < 10; i++){
165                 if(isLeader()){
166                     break;
167                 }
168                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
169             }
170         }
171
172         public List<Object> getState() {
173             return state;
174         }
175
176         public static Props props(final String id, final Map<String, String> peerAddresses,
177                 Optional<ConfigParams> config){
178             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
179         }
180
181         public static Props props(final String id, final Map<String, String> peerAddresses,
182                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
183             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
184         }
185
186         public static Props props(final String id, final Map<String, String> peerAddresses,
187             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
188             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
189         }
190
191         public static Props props(final String id, final Map<String, String> peerAddresses,
192                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
193                                   DataPersistenceProvider dataPersistenceProvider){
194             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
195         }
196
197
198         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
199             delegate.applyState(clientActor, identifier, data);
200             LOG.info("{}: applyState called", persistenceId());
201         }
202
203         @Override
204         protected void startLogRecoveryBatch(int maxBatchSize) {
205         }
206
207         @Override
208         protected void appendRecoveredLogEntry(Payload data) {
209             state.add(data);
210         }
211
212         @Override
213         protected void applyCurrentLogRecoveryBatch() {
214         }
215
216         @Override
217         protected void onRecoveryComplete() {
218             delegate.onRecoveryComplete();
219             recoveryComplete.countDown();
220         }
221
222         @Override
223         protected void initializeBehavior() {
224             super.initializeBehavior();
225             initializeBehaviorComplete.countDown();
226         }
227
228         @Override
229         protected void applyRecoverySnapshot(byte[] bytes) {
230             delegate.applyRecoverySnapshot(bytes);
231             try {
232                 Object data = toObject(bytes);
233                 if (data instanceof List) {
234                     state.addAll((List<?>) data);
235                 }
236             } catch (Exception e) {
237                 e.printStackTrace();
238             }
239         }
240
241         @Override protected void createSnapshot() {
242             LOG.info("{}: createSnapshot called", persistenceId());
243             delegate.createSnapshot();
244         }
245
246         @Override protected void applySnapshot(byte [] snapshot) {
247             LOG.info("{}: applySnapshot called", persistenceId());
248             delegate.applySnapshot(snapshot);
249         }
250
251         @Override protected void onStateChanged() {
252             delegate.onStateChanged();
253         }
254
255         @Override
256         protected Optional<ActorRef> getRoleChangeNotifier() {
257             return Optional.fromNullable(roleChangeNotifier);
258         }
259
260         @Override public String persistenceId() {
261             return this.getId();
262         }
263
264         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
265             Object obj = null;
266             ByteArrayInputStream bis = null;
267             ObjectInputStream ois = null;
268             try {
269                 bis = new ByteArrayInputStream(bs);
270                 ois = new ObjectInputStream(bis);
271                 obj = ois.readObject();
272             } finally {
273                 if (bis != null) {
274                     bis.close();
275                 }
276                 if (ois != null) {
277                     ois.close();
278                 }
279             }
280             return obj;
281         }
282
283         public ReplicatedLog getReplicatedLog(){
284             return this.getRaftActorContext().getReplicatedLog();
285         }
286
287     }
288
289
290     public static class RaftActorTestKit extends JavaTestKit {
291         private final ActorRef raftActor;
292
293         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
294             super(actorSystem);
295
296             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
297                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
298
299         }
300
301
302         public ActorRef getRaftActor() {
303             return raftActor;
304         }
305
306         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
307             // Wait for a specific log message to show up
308             return
309                 new JavaTestKit.EventFilter<Boolean>(logEventClass
310                 ) {
311                     @Override
312                     protected Boolean run() {
313                         return true;
314                     }
315                 }.from(raftActor.path().toString())
316                     .message(message)
317                     .occurrences(1).exec();
318
319
320         }
321
322         protected void waitUntilLeader(){
323             waitUntilLeader(raftActor);
324         }
325
326         public static void waitUntilLeader(ActorRef actorRef) {
327             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
328             for(int i = 0; i < 20 * 5; i++) {
329                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
330                 try {
331                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
332                     if(resp.getLeaderActor() != null) {
333                         return;
334                     }
335                 } catch(TimeoutException e) {
336                 } catch(Exception e) {
337                     System.err.println("FindLeader threw ex");
338                     e.printStackTrace();
339                 }
340
341
342                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
343             }
344
345             Assert.fail("Leader not found for actorRef " + actorRef.path());
346         }
347
348     }
349
350
351     @Test
352     public void testConstruction() {
353         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
354     }
355
356     @Test
357     public void testFindLeaderWhenLeaderIsSelf(){
358         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
359         kit.waitUntilLeader();
360     }
361
362     @Test
363     public void testRaftActorRecovery() throws Exception {
364         new JavaTestKit(getSystem()) {{
365             String persistenceId = factory.generateActorId("follower-");
366
367             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
368             // Set the heartbeat interval high to essentially disable election otherwise the test
369             // may fail if the actor is switched to Leader and the commitIndex is set to the last
370             // log entry.
371             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
372
373             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
374                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
375
376             watch(followerActor);
377
378             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
379             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
380                     new MockRaftActorContext.MockPayload("E"));
381             snapshotUnappliedEntries.add(entry1);
382
383             int lastAppliedDuringSnapshotCapture = 3;
384             int lastIndexDuringSnapshotCapture = 4;
385
386             // 4 messages as part of snapshot, which are applied to state
387             ByteString snapshotBytes = fromObject(Arrays.asList(
388                     new MockRaftActorContext.MockPayload("A"),
389                     new MockRaftActorContext.MockPayload("B"),
390                     new MockRaftActorContext.MockPayload("C"),
391                     new MockRaftActorContext.MockPayload("D")));
392
393             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
394                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
395                     lastAppliedDuringSnapshotCapture, 1);
396             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
397
398             // add more entries after snapshot is taken
399             List<ReplicatedLogEntry> entries = new ArrayList<>();
400             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
401                     new MockRaftActorContext.MockPayload("F"));
402             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
403                     new MockRaftActorContext.MockPayload("G"));
404             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
405                     new MockRaftActorContext.MockPayload("H"));
406             entries.add(entry2);
407             entries.add(entry3);
408             entries.add(entry4);
409
410             int lastAppliedToState = 5;
411             int lastIndex = 7;
412
413             InMemoryJournal.addEntry(persistenceId, 5, entry2);
414             // 2 entries are applied to state besides the 4 entries in snapshot
415             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
416             InMemoryJournal.addEntry(persistenceId, 7, entry3);
417             InMemoryJournal.addEntry(persistenceId, 8, entry4);
418
419             // kill the actor
420             followerActor.tell(PoisonPill.getInstance(), null);
421             expectMsgClass(duration("5 seconds"), Terminated.class);
422
423             unwatch(followerActor);
424
425             //reinstate the actor
426             TestActorRef<MockRaftActor> ref = factory.createTestActor(
427                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
428                             Optional.<ConfigParams>of(config)));
429
430             ref.underlyingActor().waitForRecoveryComplete();
431
432             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
433             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
434                     context.getReplicatedLog().size());
435             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
436             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
437             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
438             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
439         }};
440     }
441
442     @Test
443     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
444         new JavaTestKit(getSystem()) {{
445             String persistenceId = factory.generateActorId("leader-");
446
447             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
448             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
449
450             // Setup the persisted journal with some entries
451             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
452                     new MockRaftActorContext.MockPayload("zero"));
453             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
454                     new MockRaftActorContext.MockPayload("oen"));
455             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
456                     new MockRaftActorContext.MockPayload("two"));
457
458             long seqNr = 1;
459             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
460             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
461             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
462             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
463
464             int lastAppliedToState = 1;
465             int lastIndex = 2;
466
467             //reinstate the actor
468             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
469                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
470                             Optional.<ConfigParams>of(config)));
471
472             leaderActor.underlyingActor().waitForRecoveryComplete();
473
474             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
475             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
476             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
477             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
478             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
479         }};
480     }
481
482     /**
483      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
484      * process recovery messages
485      *
486      * @throws Exception
487      */
488
489     @Test
490     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
491         new JavaTestKit(getSystem()) {
492             {
493                 String persistenceId = factory.generateActorId("leader-");
494
495                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
496
497                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
498
499                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
500                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
501
502                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
503
504                 // Wait for akka's recovery to complete so it doesn't interfere.
505                 mockRaftActor.waitForRecoveryComplete();
506
507                 ByteString snapshotBytes = fromObject(Arrays.asList(
508                         new MockRaftActorContext.MockPayload("A"),
509                         new MockRaftActorContext.MockPayload("B"),
510                         new MockRaftActorContext.MockPayload("C"),
511                         new MockRaftActorContext.MockPayload("D")));
512
513                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
514                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
515
516                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
517
518                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
519
520                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
521
522                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
523
524                 assertEquals("add replicated log entry", 1, replicatedLog.size());
525
526                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
527
528                 assertEquals("add replicated log entry", 2, replicatedLog.size());
529
530                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
531
532                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
533
534                 // The snapshot had 4 items + we added 2 more items during the test
535                 // We start removing from 5 and we should get 1 item in the replicated log
536                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
537
538                 assertEquals("remove log entries", 1, replicatedLog.size());
539
540                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
541
542                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
543                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
544
545                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
546
547             }};
548     }
549
550     /**
551      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
552      * not process recovery messages
553      *
554      * @throws Exception
555      */
556     @Test
557     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
558         new JavaTestKit(getSystem()) {
559             {
560                 String persistenceId = factory.generateActorId("leader-");
561
562                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
563
564                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
565
566                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
567                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
568
569                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
570
571                 // Wait for akka's recovery to complete so it doesn't interfere.
572                 mockRaftActor.waitForRecoveryComplete();
573
574                 ByteString snapshotBytes = fromObject(Arrays.asList(
575                         new MockRaftActorContext.MockPayload("A"),
576                         new MockRaftActorContext.MockPayload("B"),
577                         new MockRaftActorContext.MockPayload("C"),
578                         new MockRaftActorContext.MockPayload("D")));
579
580                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
581                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
582
583                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
584
585                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
586
587                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
588
589                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
590
591                 assertEquals("add replicated log entry", 0, replicatedLog.size());
592
593                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
594
595                 assertEquals("add replicated log entry", 0, replicatedLog.size());
596
597                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
598
599                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
600
601                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
602
603                 assertEquals("remove log entries", 0, replicatedLog.size());
604
605                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
606
607                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
608                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
609
610                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
611             }};
612     }
613
614
615     @Test
616     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
617         new JavaTestKit(getSystem()) {
618             {
619                 String persistenceId = factory.generateActorId("leader-");
620
621                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
622
623                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
624
625                 CountDownLatch persistLatch = new CountDownLatch(1);
626                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
627                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
628
629                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
630                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
631
632                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
633
634                 mockRaftActor.waitForInitializeBehaviorComplete();
635
636                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
637
638                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
639             }
640         };
641     }
642
643     @Test
644     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
645         new JavaTestKit(getSystem()) {
646             {
647                 String persistenceId = factory.generateActorId("leader-");
648
649                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
650
651                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
652
653                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
654
655                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
656                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
657
658                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
659
660                 mockRaftActor.waitForInitializeBehaviorComplete();
661
662                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
663
664                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
665
666                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
667             }
668         };
669     }
670
671     @Test
672     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
673         new JavaTestKit(getSystem()) {
674             {
675                 String persistenceId = factory.generateActorId("leader-");
676
677                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
678
679                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
680
681                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
682
683                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
684                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
685
686                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
687
688                 mockRaftActor.waitForInitializeBehaviorComplete();
689
690                 mockRaftActor.waitUntilLeader();
691
692                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
693
694                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
695
696                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
697             }
698         };
699     }
700
701     @Test
702     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
703         new JavaTestKit(getSystem()) {
704             {
705                 String persistenceId = factory.generateActorId("leader-");
706
707                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
708
709                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
710
711                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
712
713                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
714                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
715
716                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
717
718                 mockRaftActor.waitForInitializeBehaviorComplete();
719
720                 mockRaftActor.waitUntilLeader();
721
722                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
723
724                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
725
726             }
727
728         };
729     }
730
731     @Test
732     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
733         new JavaTestKit(getSystem()) {
734             {
735                 String persistenceId = factory.generateActorId("leader-");
736
737                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
738
739                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
740
741                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
742
743                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
744                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
745                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
746
747                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
748
749                 mockRaftActor.waitForInitializeBehaviorComplete();
750
751                 ByteString snapshotBytes = fromObject(Arrays.asList(
752                         new MockRaftActorContext.MockPayload("A"),
753                         new MockRaftActorContext.MockPayload("B"),
754                         new MockRaftActorContext.MockPayload("C"),
755                         new MockRaftActorContext.MockPayload("D")));
756
757                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
758
759                 raftActorContext.getSnapshotManager().capture(
760                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
761                                 new MockRaftActorContext.MockPayload("D")), -1);
762
763                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
764
765                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
766
767                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
768
769             }
770         };
771     }
772
773     @Test
774     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
775         new JavaTestKit(getSystem()) {
776             {
777                 String persistenceId = factory.generateActorId("leader-");
778
779                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
780
781                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
782
783                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
784
785                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
786                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
787
788                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
789
790                 mockRaftActor.waitForInitializeBehaviorComplete();
791                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
792
793                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
794                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
795                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
796                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
797                 mockRaftActor.getReplicatedLog().append(lastEntry);
798
799                 ByteString snapshotBytes = fromObject(Arrays.asList(
800                         new MockRaftActorContext.MockPayload("A"),
801                         new MockRaftActorContext.MockPayload("B"),
802                         new MockRaftActorContext.MockPayload("C"),
803                         new MockRaftActorContext.MockPayload("D")));
804
805                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
806                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
807
808                 long replicatedToAllIndex = 1;
809
810                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
811
812                 verify(mockRaftActor.delegate).createSnapshot();
813
814                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
815
816                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
817
818                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
819
820                 verify(dataPersistenceProvider).deleteMessages(100);
821
822                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
823                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
824
825                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
826                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
827                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
828
829                 // Index 2 will not be in the log because it was removed due to snapshotting
830                 assertNull(mockRaftActor.getReplicatedLog().get(1));
831                 assertNull(mockRaftActor.getReplicatedLog().get(0));
832
833             }
834         };
835     }
836
837     @Test
838     public void testApplyState() throws Exception {
839
840         new JavaTestKit(getSystem()) {
841             {
842                 String persistenceId = factory.generateActorId("leader-");
843
844                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
845
846                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
847
848                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
849
850                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
851                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
852
853                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
854
855                 mockRaftActor.waitForInitializeBehaviorComplete();
856
857                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
858                         new MockRaftActorContext.MockPayload("F"));
859
860                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
861
862                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
863
864             }
865         };
866     }
867
868     @Test
869     public void testApplySnapshot() throws Exception {
870         new JavaTestKit(getSystem()) {
871             {
872                 String persistenceId = factory.generateActorId("leader-");
873
874                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
875
876                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
877
878                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
879
880                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
881                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
882
883                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
884
885                 mockRaftActor.waitForInitializeBehaviorComplete();
886
887                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
888
889                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
890                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
891                 oldReplicatedLog.append(
892                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
893                                 mock(Payload.class)));
894
895                 ByteString snapshotBytes = fromObject(Arrays.asList(
896                         new MockRaftActorContext.MockPayload("A"),
897                         new MockRaftActorContext.MockPayload("B"),
898                         new MockRaftActorContext.MockPayload("C"),
899                         new MockRaftActorContext.MockPayload("D")));
900
901                 Snapshot snapshot = mock(Snapshot.class);
902
903                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
904
905                 doReturn(3L).when(snapshot).getLastAppliedIndex();
906
907                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
908
909                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
910
911                 assertTrue("The replicatedLog should have changed",
912                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
913
914                 assertEquals("lastApplied should be same as in the snapshot",
915                         (Long) 3L, mockRaftActor.getLastApplied());
916
917                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
918
919             }
920         };
921     }
922
923     @Test
924     public void testSaveSnapshotFailure() throws Exception {
925         new JavaTestKit(getSystem()) {
926             {
927                 String persistenceId = factory.generateActorId("leader-");
928
929                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
930
931                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
932
933                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
934
935                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
936                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
937
938                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
939
940                 mockRaftActor.waitForInitializeBehaviorComplete();
941
942                 ByteString snapshotBytes = fromObject(Arrays.asList(
943                         new MockRaftActorContext.MockPayload("A"),
944                         new MockRaftActorContext.MockPayload("B"),
945                         new MockRaftActorContext.MockPayload("C"),
946                         new MockRaftActorContext.MockPayload("D")));
947
948                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
949
950                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
951
952                 raftActorContext.getSnapshotManager().capture(
953                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
954                                 new MockRaftActorContext.MockPayload("D")), 1);
955
956                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
957
958                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
959                         new Exception()));
960
961                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
962                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
963
964             }
965         };
966     }
967
968     @Test
969     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
970         new JavaTestKit(getSystem()) {{
971             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
972                     Props.create(MessageCollectorActor.class));
973             MessageCollectorActor.waitUntilReady(notifierActor);
974
975             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
976             long heartBeatInterval = 100;
977             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
978             config.setElectionTimeoutFactor(20);
979
980             String persistenceId = factory.generateActorId("notifier-");
981
982             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
983                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
984                     new NonPersistentDataProvider()), persistenceId);
985
986             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
987
988
989             // check if the notifier got a role change from null to Follower
990             RoleChanged raftRoleChanged = matches.get(0);
991             assertEquals(persistenceId, raftRoleChanged.getMemberId());
992             assertNull(raftRoleChanged.getOldRole());
993             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
994
995             // check if the notifier got a role change from Follower to Candidate
996             raftRoleChanged = matches.get(1);
997             assertEquals(persistenceId, raftRoleChanged.getMemberId());
998             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
999             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1000
1001             // check if the notifier got a role change from Candidate to Leader
1002             raftRoleChanged = matches.get(2);
1003             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1004             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1005             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1006
1007             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1008                     notifierActor, LeaderStateChanged.class);
1009
1010             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1011
1012             notifierActor.underlyingActor().clear();
1013
1014             MockRaftActor raftActor = raftActorRef.underlyingActor();
1015             final String newLeaderId = "new-leader";
1016             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1017                 @Override
1018                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1019                     leaderId = newLeaderId;
1020                     return this;
1021                 }
1022             };
1023
1024             raftActor.changeCurrentBehavior(follower);
1025
1026             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1027             assertEquals(persistenceId, leaderStateChange.getMemberId());
1028             assertEquals(null, leaderStateChange.getLeaderId());
1029
1030             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1031             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1032             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1033
1034             notifierActor.underlyingActor().clear();
1035
1036             raftActor.handleCommand("any");
1037
1038             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1039             assertEquals(persistenceId, leaderStateChange.getMemberId());
1040             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1041         }};
1042     }
1043
1044     @Test
1045     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1046         new JavaTestKit(getSystem()) {{
1047             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1048             MessageCollectorActor.waitUntilReady(notifierActor);
1049
1050             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1051             long heartBeatInterval = 100;
1052             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1053             config.setElectionTimeoutFactor(1);
1054
1055             String persistenceId = factory.generateActorId("notifier-");
1056
1057             factory.createActor(MockRaftActor.props(persistenceId,
1058                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1059
1060             List<RoleChanged> matches =  null;
1061             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1062                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1063                 assertNotNull(matches);
1064                 if(matches.size() == 3) {
1065                     break;
1066                 }
1067                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1068             }
1069
1070             assertEquals(2, matches.size());
1071
1072             // check if the notifier got a role change from null to Follower
1073             RoleChanged raftRoleChanged = matches.get(0);
1074             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1075             assertNull(raftRoleChanged.getOldRole());
1076             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1077
1078             // check if the notifier got a role change from Follower to Candidate
1079             raftRoleChanged = matches.get(1);
1080             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1081             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1082             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1083
1084         }};
1085     }
1086
1087     @Test
1088     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1089         new JavaTestKit(getSystem()) {
1090             {
1091                 String persistenceId = factory.generateActorId("leader-");
1092                 String follower1Id = factory.generateActorId("follower-");
1093
1094                 ActorRef followerActor1 =
1095                         factory.createActor(Props.create(MessageCollectorActor.class));
1096
1097                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1099                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1100
1101                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1102
1103                 Map<String, String> peerAddresses = new HashMap<>();
1104                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1105
1106                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1107                         MockRaftActor.props(persistenceId, peerAddresses,
1108                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1109
1110                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1111
1112                 leaderActor.getRaftActorContext().setCommitIndex(4);
1113                 leaderActor.getRaftActorContext().setLastApplied(4);
1114                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1115
1116                 leaderActor.waitForInitializeBehaviorComplete();
1117
1118                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1119
1120                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1121                 leaderActor.setCurrentBehavior(leader);
1122                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1123
1124                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1125                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1126
1127                 assertEquals(8, leaderActor.getReplicatedLog().size());
1128
1129                 leaderActor.getRaftActorContext().getSnapshotManager()
1130                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1131                                 new MockRaftActorContext.MockPayload("x")), 4);
1132
1133                 verify(leaderActor.delegate).createSnapshot();
1134
1135                 assertEquals(8, leaderActor.getReplicatedLog().size());
1136
1137                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1138                 //fake snapshot on index 5
1139                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1140
1141                 assertEquals(8, leaderActor.getReplicatedLog().size());
1142
1143                 //fake snapshot on index 6
1144                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1145                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1146                 assertEquals(8, leaderActor.getReplicatedLog().size());
1147
1148                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1149
1150                 assertEquals(8, leaderActor.getReplicatedLog().size());
1151
1152                 ByteString snapshotBytes = fromObject(Arrays.asList(
1153                         new MockRaftActorContext.MockPayload("foo-0"),
1154                         new MockRaftActorContext.MockPayload("foo-1"),
1155                         new MockRaftActorContext.MockPayload("foo-2"),
1156                         new MockRaftActorContext.MockPayload("foo-3"),
1157                         new MockRaftActorContext.MockPayload("foo-4")));
1158
1159                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1160                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1161
1162                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1163
1164                 // The commit is needed to complete the snapshot creation process
1165                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1166
1167                 // capture snapshot reply should remove the snapshotted entries only
1168                 assertEquals(3, leaderActor.getReplicatedLog().size());
1169                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1170
1171                 // add another non-replicated entry
1172                 leaderActor.getReplicatedLog().append(
1173                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1174
1175                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1176                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1177                 assertEquals(2, leaderActor.getReplicatedLog().size());
1178                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1179
1180             }
1181         };
1182     }
1183
1184     @Test
1185     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1186         new JavaTestKit(getSystem()) {
1187             {
1188                 String persistenceId = factory.generateActorId("follower-");
1189                 String leaderId = factory.generateActorId("leader-");
1190
1191
1192                 ActorRef leaderActor1 =
1193                         factory.createActor(Props.create(MessageCollectorActor.class));
1194
1195                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1196                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1197                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1198
1199                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1200
1201                 Map<String, String> peerAddresses = new HashMap<>();
1202                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1203
1204                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1205                         MockRaftActor.props(persistenceId, peerAddresses,
1206                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1207
1208                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1209                 followerActor.getRaftActorContext().setCommitIndex(4);
1210                 followerActor.getRaftActorContext().setLastApplied(4);
1211                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1212
1213                 followerActor.waitForInitializeBehaviorComplete();
1214
1215
1216                 Follower follower = new Follower(followerActor.getRaftActorContext());
1217                 followerActor.setCurrentBehavior(follower);
1218                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1219
1220                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1221                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1222                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1223
1224                 // log has indices 0-5
1225                 assertEquals(6, followerActor.getReplicatedLog().size());
1226
1227                 //snapshot on 4
1228                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1229                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1230                                 new MockRaftActorContext.MockPayload("D")), 4);
1231
1232                 verify(followerActor.delegate).createSnapshot();
1233
1234                 assertEquals(6, followerActor.getReplicatedLog().size());
1235
1236                 //fake snapshot on index 6
1237                 List<ReplicatedLogEntry> entries =
1238                         Arrays.asList(
1239                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1240                                         new MockRaftActorContext.MockPayload("foo-6"))
1241                         );
1242                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1243                 assertEquals(7, followerActor.getReplicatedLog().size());
1244
1245                 //fake snapshot on index 7
1246                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1247
1248                 entries =
1249                         Arrays.asList(
1250                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1251                                         new MockRaftActorContext.MockPayload("foo-7"))
1252                         );
1253                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1254                 assertEquals(8, followerActor.getReplicatedLog().size());
1255
1256                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1257
1258
1259                 ByteString snapshotBytes = fromObject(Arrays.asList(
1260                         new MockRaftActorContext.MockPayload("foo-0"),
1261                         new MockRaftActorContext.MockPayload("foo-1"),
1262                         new MockRaftActorContext.MockPayload("foo-2"),
1263                         new MockRaftActorContext.MockPayload("foo-3"),
1264                         new MockRaftActorContext.MockPayload("foo-4")));
1265                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1266                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1267
1268                 // The commit is needed to complete the snapshot creation process
1269                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1270
1271                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1272                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1273                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1274
1275                 entries =
1276                         Arrays.asList(
1277                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1278                                         new MockRaftActorContext.MockPayload("foo-7"))
1279                         );
1280                 // send an additional entry 8 with leaderCommit = 7
1281                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1282
1283                 // 7 and 8, as lastapplied is 7
1284                 assertEquals(2, followerActor.getReplicatedLog().size());
1285
1286             }
1287         };
1288     }
1289
1290     @Test
1291     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1292         new JavaTestKit(getSystem()) {
1293             {
1294                 String persistenceId = factory.generateActorId("leader-");
1295                 String follower1Id = factory.generateActorId("follower-");
1296                 String follower2Id = factory.generateActorId("follower-");
1297
1298                 ActorRef followerActor1 =
1299                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1300                 ActorRef followerActor2 =
1301                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1302
1303                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1304                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1305                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1306
1307                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1308
1309                 Map<String, String> peerAddresses = new HashMap<>();
1310                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1311                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1312
1313                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1314                         MockRaftActor.props(persistenceId, peerAddresses,
1315                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1316
1317                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1318                 leaderActor.getRaftActorContext().setCommitIndex(9);
1319                 leaderActor.getRaftActorContext().setLastApplied(9);
1320                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1321
1322                 leaderActor.waitForInitializeBehaviorComplete();
1323
1324                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1325                 leaderActor.setCurrentBehavior(leader);
1326                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1327
1328                 // create 5 entries in the log
1329                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1330                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1331
1332                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1333                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1334                 //setting replicatedToAllIndex = 9, for the log to clear
1335                 leader.setReplicatedToAllIndex(9);
1336                 assertEquals(5, leaderActor.getReplicatedLog().size());
1337                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1338
1339                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1340                 assertEquals(5, leaderActor.getReplicatedLog().size());
1341                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1342
1343                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1344                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1345                 assertEquals(5, leaderActor.getReplicatedLog().size());
1346                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1347
1348                 // simulate a real snapshot
1349                 leaderActor.onReceiveCommand(new SendHeartBeat());
1350                 assertEquals(5, leaderActor.getReplicatedLog().size());
1351                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1352                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1353                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1354
1355
1356                 //reply from a slow follower does not initiate a fake snapshot
1357                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1358                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1359
1360                 ByteString snapshotBytes = fromObject(Arrays.asList(
1361                         new MockRaftActorContext.MockPayload("foo-0"),
1362                         new MockRaftActorContext.MockPayload("foo-1"),
1363                         new MockRaftActorContext.MockPayload("foo-2"),
1364                         new MockRaftActorContext.MockPayload("foo-3"),
1365                         new MockRaftActorContext.MockPayload("foo-4")));
1366                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1367                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1368
1369                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1370
1371                 //reply from a slow follower after should not raise errors
1372                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1373                 assertEquals(0, leaderActor.getReplicatedLog().size());
1374             }
1375         };
1376     }
1377
1378     @Test
1379     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1380         new JavaTestKit(getSystem()) {{
1381             String persistenceId = factory.generateActorId("leader-");
1382             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1383             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1384             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385             config.setSnapshotBatchCount(5);
1386
1387             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1388
1389             Map<String, String> peerAddresses = new HashMap<>();
1390
1391             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1392                     MockRaftActor.props(persistenceId, peerAddresses,
1393                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1394
1395             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1396             leaderActor.getRaftActorContext().setCommitIndex(3);
1397             leaderActor.getRaftActorContext().setLastApplied(3);
1398             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1399
1400             leaderActor.waitForInitializeBehaviorComplete();
1401             for(int i=0;i< 4;i++) {
1402                 leaderActor.getReplicatedLog()
1403                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1404                                 new MockRaftActorContext.MockPayload("A")));
1405             }
1406
1407             Leader leader = new Leader(leaderActor.getRaftActorContext());
1408             leaderActor.setCurrentBehavior(leader);
1409             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1410
1411             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1412             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1413
1414             // Now send a CaptureSnapshotReply
1415             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1416
1417             // Trimming log in this scenario is a no-op
1418             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1419             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1420             assertEquals(-1, leader.getReplicatedToAllIndex());
1421
1422         }};
1423     }
1424
1425     @Test
1426     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1427         new JavaTestKit(getSystem()) {{
1428             String persistenceId = factory.generateActorId("leader-");
1429             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1430             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1431             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1432             config.setSnapshotBatchCount(5);
1433
1434             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1435
1436             Map<String, String> peerAddresses = new HashMap<>();
1437
1438             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1439                     MockRaftActor.props(persistenceId, peerAddresses,
1440                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1441
1442             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1443             leaderActor.getRaftActorContext().setCommitIndex(3);
1444             leaderActor.getRaftActorContext().setLastApplied(3);
1445             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1446             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1447
1448             leaderActor.waitForInitializeBehaviorComplete();
1449             Leader leader = new Leader(leaderActor.getRaftActorContext());
1450             leaderActor.setCurrentBehavior(leader);
1451             leader.setReplicatedToAllIndex(3);
1452             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1453
1454             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1455             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1456
1457             // Now send a CaptureSnapshotReply
1458             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1459
1460             // Trimming log in this scenario is a no-op
1461             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1462             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1463             assertEquals(3, leader.getReplicatedToAllIndex());
1464
1465         }};
1466     }
1467
1468     private ByteString fromObject(Object snapshot) throws Exception {
1469         ByteArrayOutputStream b = null;
1470         ObjectOutputStream o = null;
1471         try {
1472             b = new ByteArrayOutputStream();
1473             o = new ObjectOutputStream(b);
1474             o.writeObject(snapshot);
1475             byte[] snapshotBytes = b.toByteArray();
1476             return ByteString.copyFrom(snapshotBytes);
1477         } finally {
1478             if (o != null) {
1479                 o.flush();
1480                 o.close();
1481             }
1482             if (b != null) {
1483                 b.close();
1484             }
1485         }
1486     }
1487
1488 }