Merge "Set unable-to-connect(netconf node) if no sources are available"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RoleChanged;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
66 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
67 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
68 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
69 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
71 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.Duration;
80 import scala.concurrent.duration.FiniteDuration;
81
82 public class RaftActorTest extends AbstractActorTest {
83
84     private TestActorFactory factory;
85
86     @Before
87     public void setUp(){
88         factory = new TestActorFactory(getSystem());
89     }
90
91     @After
92     public void tearDown() throws Exception {
93         factory.close();
94         InMemoryJournal.clear();
95         InMemorySnapshotStore.clear();
96     }
97
98     public static class MockRaftActor extends RaftActor {
99
100         protected DataPersistenceProvider dataPersistenceProvider;
101         private final RaftActor delegate;
102         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103         private final List<Object> state;
104         private ActorRef roleChangeNotifier;
105         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
106
107         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108             private static final long serialVersionUID = 1L;
109             private final Map<String, String> peerAddresses;
110             private final String id;
111             private final Optional<ConfigParams> config;
112             private final DataPersistenceProvider dataPersistenceProvider;
113             private final ActorRef roleChangeNotifier;
114
115             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117                 ActorRef roleChangeNotifier) {
118                 this.peerAddresses = peerAddresses;
119                 this.id = id;
120                 this.config = config;
121                 this.dataPersistenceProvider = dataPersistenceProvider;
122                 this.roleChangeNotifier = roleChangeNotifier;
123             }
124
125             @Override
126             public MockRaftActor create() throws Exception {
127                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128                     dataPersistenceProvider);
129                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130                 return mockRaftActor;
131             }
132         }
133
134         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135                              DataPersistenceProvider dataPersistenceProvider) {
136             super(id, peerAddresses, config);
137             state = new ArrayList<>();
138             this.delegate = mock(RaftActor.class);
139             if(dataPersistenceProvider == null){
140                 this.dataPersistenceProvider = new PersistentDataProvider();
141             } else {
142                 this.dataPersistenceProvider = dataPersistenceProvider;
143             }
144         }
145
146         public void waitForRecoveryComplete() {
147             try {
148                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
149             } catch (InterruptedException e) {
150                 e.printStackTrace();
151             }
152         }
153
154         public void waitForInitializeBehaviorComplete() {
155             try {
156                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
157             } catch (InterruptedException e) {
158                 e.printStackTrace();
159             }
160         }
161
162
163         public void waitUntilLeader(){
164             for(int i = 0;i < 10; i++){
165                 if(isLeader()){
166                     break;
167                 }
168                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
169             }
170         }
171
172         public List<Object> getState() {
173             return state;
174         }
175
176         public static Props props(final String id, final Map<String, String> peerAddresses,
177                 Optional<ConfigParams> config){
178             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
179         }
180
181         public static Props props(final String id, final Map<String, String> peerAddresses,
182                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
183             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
184         }
185
186         public static Props props(final String id, final Map<String, String> peerAddresses,
187             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
188             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
189         }
190
191         public static Props props(final String id, final Map<String, String> peerAddresses,
192                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
193                                   DataPersistenceProvider dataPersistenceProvider){
194             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
195         }
196
197
198         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
199             delegate.applyState(clientActor, identifier, data);
200             LOG.info("{}: applyState called", persistenceId());
201         }
202
203         @Override
204         protected void startLogRecoveryBatch(int maxBatchSize) {
205         }
206
207         @Override
208         protected void appendRecoveredLogEntry(Payload data) {
209             state.add(data);
210         }
211
212         @Override
213         protected void applyCurrentLogRecoveryBatch() {
214         }
215
216         @Override
217         protected void onRecoveryComplete() {
218             delegate.onRecoveryComplete();
219             recoveryComplete.countDown();
220         }
221
222         @Override
223         protected void initializeBehavior() {
224             super.initializeBehavior();
225             initializeBehaviorComplete.countDown();
226         }
227
228         @Override
229         protected void applyRecoverySnapshot(byte[] bytes) {
230             delegate.applyRecoverySnapshot(bytes);
231             try {
232                 Object data = toObject(bytes);
233                 if (data instanceof List) {
234                     state.addAll((List<?>) data);
235                 }
236             } catch (Exception e) {
237                 e.printStackTrace();
238             }
239         }
240
241         @Override protected void createSnapshot() {
242             LOG.info("{}: createSnapshot called", persistenceId());
243             delegate.createSnapshot();
244         }
245
246         @Override protected void applySnapshot(byte [] snapshot) {
247             LOG.info("{}: applySnapshot called", persistenceId());
248             delegate.applySnapshot(snapshot);
249         }
250
251         @Override protected void onStateChanged() {
252             delegate.onStateChanged();
253         }
254
255         @Override
256         protected DataPersistenceProvider persistence() {
257             return this.dataPersistenceProvider;
258         }
259
260         @Override
261         protected Optional<ActorRef> getRoleChangeNotifier() {
262             return Optional.fromNullable(roleChangeNotifier);
263         }
264
265         @Override public String persistenceId() {
266             return this.getId();
267         }
268
269         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
270             Object obj = null;
271             ByteArrayInputStream bis = null;
272             ObjectInputStream ois = null;
273             try {
274                 bis = new ByteArrayInputStream(bs);
275                 ois = new ObjectInputStream(bis);
276                 obj = ois.readObject();
277             } finally {
278                 if (bis != null) {
279                     bis.close();
280                 }
281                 if (ois != null) {
282                     ois.close();
283                 }
284             }
285             return obj;
286         }
287
288         public ReplicatedLog getReplicatedLog(){
289             return this.getRaftActorContext().getReplicatedLog();
290         }
291
292     }
293
294
295     public static class RaftActorTestKit extends JavaTestKit {
296         private final ActorRef raftActor;
297
298         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
299             super(actorSystem);
300
301             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
302                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
303
304         }
305
306
307         public ActorRef getRaftActor() {
308             return raftActor;
309         }
310
311         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
312             // Wait for a specific log message to show up
313             return
314                 new JavaTestKit.EventFilter<Boolean>(logEventClass
315                 ) {
316                     @Override
317                     protected Boolean run() {
318                         return true;
319                     }
320                 }.from(raftActor.path().toString())
321                     .message(message)
322                     .occurrences(1).exec();
323
324
325         }
326
327         protected void waitUntilLeader(){
328             waitUntilLeader(raftActor);
329         }
330
331         public static void waitUntilLeader(ActorRef actorRef) {
332             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
333             for(int i = 0; i < 20 * 5; i++) {
334                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
335                 try {
336                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
337                     if(resp.getLeaderActor() != null) {
338                         return;
339                     }
340                 } catch(TimeoutException e) {
341                 } catch(Exception e) {
342                     System.err.println("FindLeader threw ex");
343                     e.printStackTrace();
344                 }
345
346
347                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
348             }
349
350             Assert.fail("Leader not found for actorRef " + actorRef.path());
351         }
352
353     }
354
355
356     @Test
357     public void testConstruction() {
358         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
359     }
360
361     @Test
362     public void testFindLeaderWhenLeaderIsSelf(){
363         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
364         kit.waitUntilLeader();
365     }
366
367     @Test
368     public void testRaftActorRecovery() throws Exception {
369         new JavaTestKit(getSystem()) {{
370             String persistenceId = factory.generateActorId("follower-");
371
372             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
373             // Set the heartbeat interval high to essentially disable election otherwise the test
374             // may fail if the actor is switched to Leader and the commitIndex is set to the last
375             // log entry.
376             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
377
378             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
379                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
380
381             watch(followerActor);
382
383             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
384             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
385                     new MockRaftActorContext.MockPayload("E"));
386             snapshotUnappliedEntries.add(entry1);
387
388             int lastAppliedDuringSnapshotCapture = 3;
389             int lastIndexDuringSnapshotCapture = 4;
390
391             // 4 messages as part of snapshot, which are applied to state
392             ByteString snapshotBytes = fromObject(Arrays.asList(
393                     new MockRaftActorContext.MockPayload("A"),
394                     new MockRaftActorContext.MockPayload("B"),
395                     new MockRaftActorContext.MockPayload("C"),
396                     new MockRaftActorContext.MockPayload("D")));
397
398             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
399                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
400                     lastAppliedDuringSnapshotCapture, 1);
401             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
402
403             // add more entries after snapshot is taken
404             List<ReplicatedLogEntry> entries = new ArrayList<>();
405             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
406                     new MockRaftActorContext.MockPayload("F"));
407             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
408                     new MockRaftActorContext.MockPayload("G"));
409             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
410                     new MockRaftActorContext.MockPayload("H"));
411             entries.add(entry2);
412             entries.add(entry3);
413             entries.add(entry4);
414
415             int lastAppliedToState = 5;
416             int lastIndex = 7;
417
418             InMemoryJournal.addEntry(persistenceId, 5, entry2);
419             // 2 entries are applied to state besides the 4 entries in snapshot
420             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
421             InMemoryJournal.addEntry(persistenceId, 7, entry3);
422             InMemoryJournal.addEntry(persistenceId, 8, entry4);
423
424             // kill the actor
425             followerActor.tell(PoisonPill.getInstance(), null);
426             expectMsgClass(duration("5 seconds"), Terminated.class);
427
428             unwatch(followerActor);
429
430             //reinstate the actor
431             TestActorRef<MockRaftActor> ref = factory.createTestActor(
432                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
433                             Optional.<ConfigParams>of(config)));
434
435             ref.underlyingActor().waitForRecoveryComplete();
436
437             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
438             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
439                     context.getReplicatedLog().size());
440             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
441             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
442             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
443             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
444         }};
445     }
446
447     @Test
448     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
449         new JavaTestKit(getSystem()) {{
450             String persistenceId = factory.generateActorId("leader-");
451
452             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
453             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
454
455             // Setup the persisted journal with some entries
456             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
457                     new MockRaftActorContext.MockPayload("zero"));
458             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
459                     new MockRaftActorContext.MockPayload("oen"));
460             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
461                     new MockRaftActorContext.MockPayload("two"));
462
463             long seqNr = 1;
464             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
465             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
466             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
467             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
468
469             int lastAppliedToState = 1;
470             int lastIndex = 2;
471
472             //reinstate the actor
473             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
474                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
475                             Optional.<ConfigParams>of(config)));
476
477             leaderActor.underlyingActor().waitForRecoveryComplete();
478
479             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
480             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
481             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
482             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
483             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
484         }};
485     }
486
487     /**
488      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
489      * process recovery messages
490      *
491      * @throws Exception
492      */
493
494     @Test
495     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
496         new JavaTestKit(getSystem()) {
497             {
498                 String persistenceId = factory.generateActorId("leader-");
499
500                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
501
502                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
503
504                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
505                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
506
507                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
508
509                 // Wait for akka's recovery to complete so it doesn't interfere.
510                 mockRaftActor.waitForRecoveryComplete();
511
512                 ByteString snapshotBytes = fromObject(Arrays.asList(
513                         new MockRaftActorContext.MockPayload("A"),
514                         new MockRaftActorContext.MockPayload("B"),
515                         new MockRaftActorContext.MockPayload("C"),
516                         new MockRaftActorContext.MockPayload("D")));
517
518                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
519                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
520
521                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
522
523                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
524
525                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
526
527                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
528
529                 assertEquals("add replicated log entry", 1, replicatedLog.size());
530
531                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
532
533                 assertEquals("add replicated log entry", 2, replicatedLog.size());
534
535                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
536
537                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
538
539                 // The snapshot had 4 items + we added 2 more items during the test
540                 // We start removing from 5 and we should get 1 item in the replicated log
541                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
542
543                 assertEquals("remove log entries", 1, replicatedLog.size());
544
545                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
546
547                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
548                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
549
550                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
551
552             }};
553     }
554
555     /**
556      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
557      * not process recovery messages
558      *
559      * @throws Exception
560      */
561     @Test
562     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
563         new JavaTestKit(getSystem()) {
564             {
565                 String persistenceId = factory.generateActorId("leader-");
566
567                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
568
569                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
570
571                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
572                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
573
574                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
575
576                 // Wait for akka's recovery to complete so it doesn't interfere.
577                 mockRaftActor.waitForRecoveryComplete();
578
579                 ByteString snapshotBytes = fromObject(Arrays.asList(
580                         new MockRaftActorContext.MockPayload("A"),
581                         new MockRaftActorContext.MockPayload("B"),
582                         new MockRaftActorContext.MockPayload("C"),
583                         new MockRaftActorContext.MockPayload("D")));
584
585                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
586                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
587
588                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
589
590                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
591
592                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
593
594                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
595
596                 assertEquals("add replicated log entry", 0, replicatedLog.size());
597
598                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
599
600                 assertEquals("add replicated log entry", 0, replicatedLog.size());
601
602                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
603
604                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
605
606                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
607
608                 assertEquals("remove log entries", 0, replicatedLog.size());
609
610                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
611
612                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
613                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
614
615                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
616             }};
617     }
618
619
620     @Test
621     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
622         new JavaTestKit(getSystem()) {
623             {
624                 String persistenceId = factory.generateActorId("leader-");
625
626                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
627
628                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
629
630                 CountDownLatch persistLatch = new CountDownLatch(1);
631                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
632                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
633
634                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
635                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
636
637                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
638
639                 mockRaftActor.waitForInitializeBehaviorComplete();
640
641                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
642
643                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
644             }
645         };
646     }
647
648     @Test
649     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
650         new JavaTestKit(getSystem()) {
651             {
652                 String persistenceId = factory.generateActorId("leader-");
653
654                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
655
656                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
657
658                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
659
660                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
661                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
662
663                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
664
665                 mockRaftActor.waitForInitializeBehaviorComplete();
666
667                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
668
669                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
670
671                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
672             }
673         };
674     }
675
676     @Test
677     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
678         new JavaTestKit(getSystem()) {
679             {
680                 String persistenceId = factory.generateActorId("leader-");
681
682                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
683
684                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
685
686                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
687
688                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
689                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
690
691                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
692
693                 mockRaftActor.waitForInitializeBehaviorComplete();
694
695                 mockRaftActor.waitUntilLeader();
696
697                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
698
699                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
700
701                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
702             }
703         };
704     }
705
706     @Test
707     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
708         new JavaTestKit(getSystem()) {
709             {
710                 String persistenceId = factory.generateActorId("leader-");
711
712                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
713
714                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
715
716                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
717
718                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
719                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
720
721                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
722
723                 mockRaftActor.waitForInitializeBehaviorComplete();
724
725                 mockRaftActor.waitUntilLeader();
726
727                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
728
729                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
730
731             }
732
733         };
734     }
735
736     @Test
737     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
738         new JavaTestKit(getSystem()) {
739             {
740                 String persistenceId = factory.generateActorId("leader-");
741
742                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
743
744                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
745
746                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
747
748                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
749                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
750                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
751
752                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
753
754                 mockRaftActor.waitForInitializeBehaviorComplete();
755
756                 ByteString snapshotBytes = fromObject(Arrays.asList(
757                         new MockRaftActorContext.MockPayload("A"),
758                         new MockRaftActorContext.MockPayload("B"),
759                         new MockRaftActorContext.MockPayload("C"),
760                         new MockRaftActorContext.MockPayload("D")));
761
762                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
763
764                 raftActorContext.getSnapshotManager().capture(
765                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
766                                 new MockRaftActorContext.MockPayload("D")), -1);
767
768                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
769
770                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
771
772                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
773
774             }
775         };
776     }
777
778     @Test
779     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
780         new JavaTestKit(getSystem()) {
781             {
782                 String persistenceId = factory.generateActorId("leader-");
783
784                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
785
786                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787
788                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
789
790                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
791                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
792
793                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
794
795                 mockRaftActor.waitForInitializeBehaviorComplete();
796                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
797
798                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
799                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
800                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
801                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
802                 mockRaftActor.getReplicatedLog().append(lastEntry);
803
804                 ByteString snapshotBytes = fromObject(Arrays.asList(
805                         new MockRaftActorContext.MockPayload("A"),
806                         new MockRaftActorContext.MockPayload("B"),
807                         new MockRaftActorContext.MockPayload("C"),
808                         new MockRaftActorContext.MockPayload("D")));
809
810                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
811                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
812
813                 long replicatedToAllIndex = 1;
814
815                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
816
817                 verify(mockRaftActor.delegate).createSnapshot();
818
819                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
820
821                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
822
823                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
824
825                 verify(dataPersistenceProvider).deleteMessages(100);
826
827                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
828                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
829
830                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
831                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
832                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
833
834                 // Index 2 will not be in the log because it was removed due to snapshotting
835                 assertNull(mockRaftActor.getReplicatedLog().get(1));
836                 assertNull(mockRaftActor.getReplicatedLog().get(0));
837
838             }
839         };
840     }
841
842     @Test
843     public void testApplyState() throws Exception {
844
845         new JavaTestKit(getSystem()) {
846             {
847                 String persistenceId = factory.generateActorId("leader-");
848
849                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
850
851                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
852
853                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
854
855                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
856                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
857
858                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
859
860                 mockRaftActor.waitForInitializeBehaviorComplete();
861
862                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
863                         new MockRaftActorContext.MockPayload("F"));
864
865                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
866
867                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
868
869             }
870         };
871     }
872
873     @Test
874     public void testApplySnapshot() throws Exception {
875         new JavaTestKit(getSystem()) {
876             {
877                 String persistenceId = factory.generateActorId("leader-");
878
879                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
880
881                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
882
883                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
884
885                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
886                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
887
888                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
889
890                 mockRaftActor.waitForInitializeBehaviorComplete();
891
892                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
893
894                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
895                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
896                 oldReplicatedLog.append(
897                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
898                                 mock(Payload.class)));
899
900                 ByteString snapshotBytes = fromObject(Arrays.asList(
901                         new MockRaftActorContext.MockPayload("A"),
902                         new MockRaftActorContext.MockPayload("B"),
903                         new MockRaftActorContext.MockPayload("C"),
904                         new MockRaftActorContext.MockPayload("D")));
905
906                 Snapshot snapshot = mock(Snapshot.class);
907
908                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
909
910                 doReturn(3L).when(snapshot).getLastAppliedIndex();
911
912                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
913
914                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
915
916                 assertTrue("The replicatedLog should have changed",
917                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
918
919                 assertEquals("lastApplied should be same as in the snapshot",
920                         (Long) 3L, mockRaftActor.getLastApplied());
921
922                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
923
924             }
925         };
926     }
927
928     @Test
929     public void testSaveSnapshotFailure() throws Exception {
930         new JavaTestKit(getSystem()) {
931             {
932                 String persistenceId = factory.generateActorId("leader-");
933
934                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
935
936                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
937
938                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
939
940                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
941                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
942
943                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
944
945                 mockRaftActor.waitForInitializeBehaviorComplete();
946
947                 ByteString snapshotBytes = fromObject(Arrays.asList(
948                         new MockRaftActorContext.MockPayload("A"),
949                         new MockRaftActorContext.MockPayload("B"),
950                         new MockRaftActorContext.MockPayload("C"),
951                         new MockRaftActorContext.MockPayload("D")));
952
953                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
954
955                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
956
957                 raftActorContext.getSnapshotManager().capture(
958                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
959                                 new MockRaftActorContext.MockPayload("D")), 1);
960
961                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
962
963                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
964                         new Exception()));
965
966                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
967                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
968
969             }
970         };
971     }
972
973     @Test
974     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
975         new JavaTestKit(getSystem()) {{
976             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
977                     Props.create(MessageCollectorActor.class));
978             MessageCollectorActor.waitUntilReady(notifierActor);
979
980             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
981             long heartBeatInterval = 100;
982             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
983             config.setElectionTimeoutFactor(20);
984
985             String persistenceId = factory.generateActorId("notifier-");
986
987             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
988                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
989                     new NonPersistentProvider()), persistenceId);
990
991             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
992
993
994             // check if the notifier got a role change from null to Follower
995             RoleChanged raftRoleChanged = matches.get(0);
996             assertEquals(persistenceId, raftRoleChanged.getMemberId());
997             assertNull(raftRoleChanged.getOldRole());
998             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
999
1000             // check if the notifier got a role change from Follower to Candidate
1001             raftRoleChanged = matches.get(1);
1002             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1003             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1004             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1005
1006             // check if the notifier got a role change from Candidate to Leader
1007             raftRoleChanged = matches.get(2);
1008             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1009             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1010             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1011
1012             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1013                     notifierActor, LeaderStateChanged.class);
1014
1015             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1016
1017             notifierActor.underlyingActor().clear();
1018
1019             MockRaftActor raftActor = raftActorRef.underlyingActor();
1020             final String newLeaderId = "new-leader";
1021             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1022                 @Override
1023                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1024                     leaderId = newLeaderId;
1025                     return this;
1026                 }
1027             };
1028
1029             raftActor.changeCurrentBehavior(follower);
1030
1031             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1032             assertEquals(persistenceId, leaderStateChange.getMemberId());
1033             assertEquals(null, leaderStateChange.getLeaderId());
1034
1035             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1036             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1037             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1038
1039             notifierActor.underlyingActor().clear();
1040
1041             raftActor.handleCommand("any");
1042
1043             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1044             assertEquals(persistenceId, leaderStateChange.getMemberId());
1045             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1046         }};
1047     }
1048
1049     @Test
1050     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1051         new JavaTestKit(getSystem()) {{
1052             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1053             MessageCollectorActor.waitUntilReady(notifierActor);
1054
1055             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1056             long heartBeatInterval = 100;
1057             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1058             config.setElectionTimeoutFactor(1);
1059
1060             String persistenceId = factory.generateActorId("notifier-");
1061
1062             factory.createActor(MockRaftActor.props(persistenceId,
1063                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1064
1065             List<RoleChanged> matches =  null;
1066             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1067                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1068                 assertNotNull(matches);
1069                 if(matches.size() == 3) {
1070                     break;
1071                 }
1072                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1073             }
1074
1075             assertEquals(2, matches.size());
1076
1077             // check if the notifier got a role change from null to Follower
1078             RoleChanged raftRoleChanged = matches.get(0);
1079             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1080             assertNull(raftRoleChanged.getOldRole());
1081             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1082
1083             // check if the notifier got a role change from Follower to Candidate
1084             raftRoleChanged = matches.get(1);
1085             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1086             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1087             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1088
1089         }};
1090     }
1091
1092     @Test
1093     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1094         new JavaTestKit(getSystem()) {
1095             {
1096                 String persistenceId = factory.generateActorId("leader-");
1097                 String follower1Id = factory.generateActorId("follower-");
1098
1099                 ActorRef followerActor1 =
1100                         factory.createActor(Props.create(MessageCollectorActor.class));
1101
1102                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1103                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1104                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1105
1106                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1107
1108                 Map<String, String> peerAddresses = new HashMap<>();
1109                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1110
1111                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1112                         MockRaftActor.props(persistenceId, peerAddresses,
1113                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1114
1115                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1116
1117                 leaderActor.getRaftActorContext().setCommitIndex(4);
1118                 leaderActor.getRaftActorContext().setLastApplied(4);
1119                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1120
1121                 leaderActor.waitForInitializeBehaviorComplete();
1122
1123                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1124
1125                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1126                 leaderActor.setCurrentBehavior(leader);
1127                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1128
1129                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1130                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1131
1132                 assertEquals(8, leaderActor.getReplicatedLog().size());
1133
1134                 leaderActor.getRaftActorContext().getSnapshotManager()
1135                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1136                                 new MockRaftActorContext.MockPayload("x")), 4);
1137
1138                 verify(leaderActor.delegate).createSnapshot();
1139
1140                 assertEquals(8, leaderActor.getReplicatedLog().size());
1141
1142                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1143                 //fake snapshot on index 5
1144                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1145
1146                 assertEquals(8, leaderActor.getReplicatedLog().size());
1147
1148                 //fake snapshot on index 6
1149                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1150                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1151                 assertEquals(8, leaderActor.getReplicatedLog().size());
1152
1153                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1154
1155                 assertEquals(8, leaderActor.getReplicatedLog().size());
1156
1157                 ByteString snapshotBytes = fromObject(Arrays.asList(
1158                         new MockRaftActorContext.MockPayload("foo-0"),
1159                         new MockRaftActorContext.MockPayload("foo-1"),
1160                         new MockRaftActorContext.MockPayload("foo-2"),
1161                         new MockRaftActorContext.MockPayload("foo-3"),
1162                         new MockRaftActorContext.MockPayload("foo-4")));
1163
1164                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
1165                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1166
1167                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1168
1169                 // The commit is needed to complete the snapshot creation process
1170                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1171
1172                 // capture snapshot reply should remove the snapshotted entries only
1173                 assertEquals(3, leaderActor.getReplicatedLog().size());
1174                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1175
1176                 // add another non-replicated entry
1177                 leaderActor.getReplicatedLog().append(
1178                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1179
1180                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1181                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1182                 assertEquals(2, leaderActor.getReplicatedLog().size());
1183                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1184
1185             }
1186         };
1187     }
1188
1189     @Test
1190     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1191         new JavaTestKit(getSystem()) {
1192             {
1193                 String persistenceId = factory.generateActorId("follower-");
1194                 String leaderId = factory.generateActorId("leader-");
1195
1196
1197                 ActorRef leaderActor1 =
1198                         factory.createActor(Props.create(MessageCollectorActor.class));
1199
1200                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1201                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1202                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1203
1204                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1205
1206                 Map<String, String> peerAddresses = new HashMap<>();
1207                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1208
1209                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1210                         MockRaftActor.props(persistenceId, peerAddresses,
1211                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1212
1213                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1214                 followerActor.getRaftActorContext().setCommitIndex(4);
1215                 followerActor.getRaftActorContext().setLastApplied(4);
1216                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1217
1218                 followerActor.waitForInitializeBehaviorComplete();
1219
1220
1221                 Follower follower = new Follower(followerActor.getRaftActorContext());
1222                 followerActor.setCurrentBehavior(follower);
1223                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1224
1225                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1226                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1227                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1228
1229                 // log has indices 0-5
1230                 assertEquals(6, followerActor.getReplicatedLog().size());
1231
1232                 //snapshot on 4
1233                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1234                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1235                                 new MockRaftActorContext.MockPayload("D")), 4);
1236
1237                 verify(followerActor.delegate).createSnapshot();
1238
1239                 assertEquals(6, followerActor.getReplicatedLog().size());
1240
1241                 //fake snapshot on index 6
1242                 List<ReplicatedLogEntry> entries =
1243                         Arrays.asList(
1244                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1245                                         new MockRaftActorContext.MockPayload("foo-6"))
1246                         );
1247                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1248                 assertEquals(7, followerActor.getReplicatedLog().size());
1249
1250                 //fake snapshot on index 7
1251                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1252
1253                 entries =
1254                         Arrays.asList(
1255                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1256                                         new MockRaftActorContext.MockPayload("foo-7"))
1257                         );
1258                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1259                 assertEquals(8, followerActor.getReplicatedLog().size());
1260
1261                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1262
1263
1264                 ByteString snapshotBytes = fromObject(Arrays.asList(
1265                         new MockRaftActorContext.MockPayload("foo-0"),
1266                         new MockRaftActorContext.MockPayload("foo-1"),
1267                         new MockRaftActorContext.MockPayload("foo-2"),
1268                         new MockRaftActorContext.MockPayload("foo-3"),
1269                         new MockRaftActorContext.MockPayload("foo-4")));
1270                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1271                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1272
1273                 // The commit is needed to complete the snapshot creation process
1274                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1275
1276                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1277                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1278                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1279
1280                 entries =
1281                         Arrays.asList(
1282                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1283                                         new MockRaftActorContext.MockPayload("foo-7"))
1284                         );
1285                 // send an additional entry 8 with leaderCommit = 7
1286                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1287
1288                 // 7 and 8, as lastapplied is 7
1289                 assertEquals(2, followerActor.getReplicatedLog().size());
1290
1291             }
1292         };
1293     }
1294
1295     @Test
1296     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1297         new JavaTestKit(getSystem()) {
1298             {
1299                 String persistenceId = factory.generateActorId("leader-");
1300                 String follower1Id = factory.generateActorId("follower-");
1301                 String follower2Id = factory.generateActorId("follower-");
1302
1303                 ActorRef followerActor1 =
1304                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1305                 ActorRef followerActor2 =
1306                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1307
1308                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1309                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1310                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1311
1312                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1313
1314                 Map<String, String> peerAddresses = new HashMap<>();
1315                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1316                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1317
1318                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1319                         MockRaftActor.props(persistenceId, peerAddresses,
1320                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1321
1322                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1323                 leaderActor.getRaftActorContext().setCommitIndex(9);
1324                 leaderActor.getRaftActorContext().setLastApplied(9);
1325                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1326
1327                 leaderActor.waitForInitializeBehaviorComplete();
1328
1329                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1330                 leaderActor.setCurrentBehavior(leader);
1331                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1332
1333                 // create 5 entries in the log
1334                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1335                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1336
1337                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1338                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1339                 //setting replicatedToAllIndex = 9, for the log to clear
1340                 leader.setReplicatedToAllIndex(9);
1341                 assertEquals(5, leaderActor.getReplicatedLog().size());
1342                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1343
1344                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1345                 assertEquals(5, leaderActor.getReplicatedLog().size());
1346                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1347
1348                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1349                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1350                 assertEquals(5, leaderActor.getReplicatedLog().size());
1351                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1352
1353                 // simulate a real snapshot
1354                 leaderActor.onReceiveCommand(new SendHeartBeat());
1355                 assertEquals(5, leaderActor.getReplicatedLog().size());
1356                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1357                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1358                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1359
1360
1361                 //reply from a slow follower does not initiate a fake snapshot
1362                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1363                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1364
1365                 ByteString snapshotBytes = fromObject(Arrays.asList(
1366                         new MockRaftActorContext.MockPayload("foo-0"),
1367                         new MockRaftActorContext.MockPayload("foo-1"),
1368                         new MockRaftActorContext.MockPayload("foo-2"),
1369                         new MockRaftActorContext.MockPayload("foo-3"),
1370                         new MockRaftActorContext.MockPayload("foo-4")));
1371                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1372                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1373
1374                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1375
1376                 //reply from a slow follower after should not raise errors
1377                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1378                 assertEquals(0, leaderActor.getReplicatedLog().size());
1379             }
1380         };
1381     }
1382
1383
1384     private static class NonPersistentProvider implements DataPersistenceProvider {
1385         @Override
1386         public boolean isRecoveryApplicable() {
1387             return false;
1388         }
1389
1390         @Override
1391         public <T> void persist(T o, Procedure<T> procedure) {
1392             try {
1393                 procedure.apply(o);
1394             } catch (Exception e) {
1395                 e.printStackTrace();
1396             }
1397         }
1398
1399         @Override
1400         public void saveSnapshot(Object o) {
1401
1402         }
1403
1404         @Override
1405         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1406
1407         }
1408
1409         @Override
1410         public void deleteMessages(long sequenceNumber) {
1411
1412         }
1413     }
1414
1415     @Test
1416     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1417         new JavaTestKit(getSystem()) {{
1418             String persistenceId = factory.generateActorId("leader-");
1419             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1420             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1421             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1422             config.setSnapshotBatchCount(5);
1423
1424             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1425
1426             Map<String, String> peerAddresses = new HashMap<>();
1427
1428             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1429                     MockRaftActor.props(persistenceId, peerAddresses,
1430                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1431
1432             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1433             leaderActor.getRaftActorContext().setCommitIndex(3);
1434             leaderActor.getRaftActorContext().setLastApplied(3);
1435             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1436
1437             leaderActor.waitForInitializeBehaviorComplete();
1438             for(int i=0;i< 4;i++) {
1439                 leaderActor.getReplicatedLog()
1440                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1441                                 new MockRaftActorContext.MockPayload("A")));
1442             }
1443
1444             Leader leader = new Leader(leaderActor.getRaftActorContext());
1445             leaderActor.setCurrentBehavior(leader);
1446             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1447
1448             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1449             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1450
1451             // Now send a CaptureSnapshotReply
1452             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1453
1454             // Trimming log in this scenario is a no-op
1455             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1456             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1457             assertEquals(-1, leader.getReplicatedToAllIndex());
1458
1459         }};
1460     }
1461
1462     @Test
1463     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1464         new JavaTestKit(getSystem()) {{
1465             String persistenceId = factory.generateActorId("leader-");
1466             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1467             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1468             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1469             config.setSnapshotBatchCount(5);
1470
1471             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1472
1473             Map<String, String> peerAddresses = new HashMap<>();
1474
1475             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1476                     MockRaftActor.props(persistenceId, peerAddresses,
1477                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1478
1479             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1480             leaderActor.getRaftActorContext().setCommitIndex(3);
1481             leaderActor.getRaftActorContext().setLastApplied(3);
1482             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1483             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1484
1485             leaderActor.waitForInitializeBehaviorComplete();
1486             Leader leader = new Leader(leaderActor.getRaftActorContext());
1487             leaderActor.setCurrentBehavior(leader);
1488             leader.setReplicatedToAllIndex(3);
1489             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1490
1491             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1492             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1493
1494             // Now send a CaptureSnapshotReply
1495             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1496
1497             // Trimming log in this scenario is a no-op
1498             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1499             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1500             assertEquals(3, leader.getReplicatedToAllIndex());
1501
1502         }};
1503     }
1504
1505     private ByteString fromObject(Object snapshot) throws Exception {
1506         ByteArrayOutputStream b = null;
1507         ObjectOutputStream o = null;
1508         try {
1509             b = new ByteArrayOutputStream();
1510             o = new ObjectOutputStream(b);
1511             o.writeObject(snapshot);
1512             byte[] snapshotBytes = b.toByteArray();
1513             return ByteString.copyFrom(snapshotBytes);
1514         } finally {
1515             if (o != null) {
1516                 o.flush();
1517                 o.close();
1518             }
1519             if (b != null) {
1520                 b.close();
1521             }
1522         }
1523     }
1524
1525 }