Merge "Verify BatchedModifications messages sent vs received"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import org.junit.After;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.opendaylight.controller.cluster.DataPersistenceProvider;
58 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
59 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
60 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
61 import org.opendaylight.controller.cluster.notifications.RoleChanged;
62 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
67 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
69 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
70 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
71 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
73 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
75 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.Await;
81 import scala.concurrent.Future;
82 import scala.concurrent.duration.Duration;
83 import scala.concurrent.duration.FiniteDuration;
84
85 public class RaftActorTest extends AbstractActorTest {
86
87     private TestActorFactory factory;
88
89     @Before
90     public void setUp(){
91         factory = new TestActorFactory(getSystem());
92     }
93
94     @After
95     public void tearDown() throws Exception {
96         factory.close();
97         InMemoryJournal.clear();
98         InMemorySnapshotStore.clear();
99     }
100
101     public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
102
103         private final RaftActor actorDelegate;
104         private final RaftActorRecoveryCohort cohortDelegate;
105         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
106         private final List<Object> state;
107         private ActorRef roleChangeNotifier;
108         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
109
110         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
111             private static final long serialVersionUID = 1L;
112             private final Map<String, String> peerAddresses;
113             private final String id;
114             private final Optional<ConfigParams> config;
115             private final DataPersistenceProvider dataPersistenceProvider;
116             private final ActorRef roleChangeNotifier;
117
118             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
119                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
120                 ActorRef roleChangeNotifier) {
121                 this.peerAddresses = peerAddresses;
122                 this.id = id;
123                 this.config = config;
124                 this.dataPersistenceProvider = dataPersistenceProvider;
125                 this.roleChangeNotifier = roleChangeNotifier;
126             }
127
128             @Override
129             public MockRaftActor create() throws Exception {
130                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
131                     dataPersistenceProvider);
132                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
133                 return mockRaftActor;
134             }
135         }
136
137         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
138                              DataPersistenceProvider dataPersistenceProvider) {
139             super(id, peerAddresses, config);
140             state = new ArrayList<>();
141             this.actorDelegate = mock(RaftActor.class);
142             this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
143             if(dataPersistenceProvider == null){
144                 setPersistence(true);
145             } else {
146                 setPersistence(dataPersistenceProvider);
147             }
148         }
149
150         public void waitForRecoveryComplete() {
151             try {
152                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
153             } catch (InterruptedException e) {
154                 e.printStackTrace();
155             }
156         }
157
158         public void waitForInitializeBehaviorComplete() {
159             try {
160                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
161             } catch (InterruptedException e) {
162                 e.printStackTrace();
163             }
164         }
165
166
167         public void waitUntilLeader(){
168             for(int i = 0;i < 10; i++){
169                 if(isLeader()){
170                     break;
171                 }
172                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
173             }
174         }
175
176         public List<Object> getState() {
177             return state;
178         }
179
180         public static Props props(final String id, final Map<String, String> peerAddresses,
181                 Optional<ConfigParams> config){
182             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
183         }
184
185         public static Props props(final String id, final Map<String, String> peerAddresses,
186                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
187             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
188         }
189
190         public static Props props(final String id, final Map<String, String> peerAddresses,
191             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
192             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
193         }
194
195         public static Props props(final String id, final Map<String, String> peerAddresses,
196                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
197                                   DataPersistenceProvider dataPersistenceProvider){
198             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
199         }
200
201
202         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
203             actorDelegate.applyState(clientActor, identifier, data);
204             LOG.info("{}: applyState called", persistenceId());
205         }
206
207         @Override
208         @Nonnull
209         protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
210             return this;
211         }
212
213         @Override
214         public void startLogRecoveryBatch(int maxBatchSize) {
215         }
216
217         @Override
218         public void appendRecoveredLogEntry(Payload data) {
219             state.add(data);
220         }
221
222         @Override
223         public void applyCurrentLogRecoveryBatch() {
224         }
225
226         @Override
227         protected void onRecoveryComplete() {
228             actorDelegate.onRecoveryComplete();
229             recoveryComplete.countDown();
230         }
231
232         @Override
233         protected void initializeBehavior() {
234             super.initializeBehavior();
235             initializeBehaviorComplete.countDown();
236         }
237
238         @Override
239         public void applyRecoverySnapshot(byte[] bytes) {
240             cohortDelegate.applyRecoverySnapshot(bytes);
241             try {
242                 Object data = toObject(bytes);
243                 if (data instanceof List) {
244                     state.addAll((List<?>) data);
245                 }
246             } catch (Exception e) {
247                 e.printStackTrace();
248             }
249         }
250
251         @Override protected void createSnapshot() {
252             LOG.info("{}: createSnapshot called", persistenceId());
253             actorDelegate.createSnapshot();
254         }
255
256         @Override protected void applySnapshot(byte [] snapshot) {
257             LOG.info("{}: applySnapshot called", persistenceId());
258             actorDelegate.applySnapshot(snapshot);
259         }
260
261         @Override protected void onStateChanged() {
262             actorDelegate.onStateChanged();
263         }
264
265         @Override
266         protected Optional<ActorRef> getRoleChangeNotifier() {
267             return Optional.fromNullable(roleChangeNotifier);
268         }
269
270         @Override public String persistenceId() {
271             return this.getId();
272         }
273
274         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
275             Object obj = null;
276             ByteArrayInputStream bis = null;
277             ObjectInputStream ois = null;
278             try {
279                 bis = new ByteArrayInputStream(bs);
280                 ois = new ObjectInputStream(bis);
281                 obj = ois.readObject();
282             } finally {
283                 if (bis != null) {
284                     bis.close();
285                 }
286                 if (ois != null) {
287                     ois.close();
288                 }
289             }
290             return obj;
291         }
292
293         public ReplicatedLog getReplicatedLog(){
294             return this.getRaftActorContext().getReplicatedLog();
295         }
296
297     }
298
299
300     public static class RaftActorTestKit extends JavaTestKit {
301         private final ActorRef raftActor;
302
303         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
304             super(actorSystem);
305
306             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
307                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
308
309         }
310
311
312         public ActorRef getRaftActor() {
313             return raftActor;
314         }
315
316         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
317             // Wait for a specific log message to show up
318             return
319                 new JavaTestKit.EventFilter<Boolean>(logEventClass
320                 ) {
321                     @Override
322                     protected Boolean run() {
323                         return true;
324                     }
325                 }.from(raftActor.path().toString())
326                     .message(message)
327                     .occurrences(1).exec();
328
329
330         }
331
332         protected void waitUntilLeader(){
333             waitUntilLeader(raftActor);
334         }
335
336         public static void waitUntilLeader(ActorRef actorRef) {
337             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
338             for(int i = 0; i < 20 * 5; i++) {
339                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
340                 try {
341                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
342                     if(resp.getLeaderActor() != null) {
343                         return;
344                     }
345                 } catch(TimeoutException e) {
346                 } catch(Exception e) {
347                     System.err.println("FindLeader threw ex");
348                     e.printStackTrace();
349                 }
350
351
352                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
353             }
354
355             Assert.fail("Leader not found for actorRef " + actorRef.path());
356         }
357
358     }
359
360
361     @Test
362     public void testConstruction() {
363         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
364     }
365
366     @Test
367     public void testFindLeaderWhenLeaderIsSelf(){
368         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
369         kit.waitUntilLeader();
370     }
371
372     @Test
373     public void testRaftActorRecovery() throws Exception {
374         new JavaTestKit(getSystem()) {{
375             String persistenceId = factory.generateActorId("follower-");
376
377             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
378             // Set the heartbeat interval high to essentially disable election otherwise the test
379             // may fail if the actor is switched to Leader and the commitIndex is set to the last
380             // log entry.
381             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
382
383             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
384                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
385
386             watch(followerActor);
387
388             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
389             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
390                     new MockRaftActorContext.MockPayload("E"));
391             snapshotUnappliedEntries.add(entry1);
392
393             int lastAppliedDuringSnapshotCapture = 3;
394             int lastIndexDuringSnapshotCapture = 4;
395
396             // 4 messages as part of snapshot, which are applied to state
397             ByteString snapshotBytes = fromObject(Arrays.asList(
398                     new MockRaftActorContext.MockPayload("A"),
399                     new MockRaftActorContext.MockPayload("B"),
400                     new MockRaftActorContext.MockPayload("C"),
401                     new MockRaftActorContext.MockPayload("D")));
402
403             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
404                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
405                     lastAppliedDuringSnapshotCapture, 1);
406             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
407
408             // add more entries after snapshot is taken
409             List<ReplicatedLogEntry> entries = new ArrayList<>();
410             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
411                     new MockRaftActorContext.MockPayload("F"));
412             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
413                     new MockRaftActorContext.MockPayload("G"));
414             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
415                     new MockRaftActorContext.MockPayload("H"));
416             entries.add(entry2);
417             entries.add(entry3);
418             entries.add(entry4);
419
420             int lastAppliedToState = 5;
421             int lastIndex = 7;
422
423             InMemoryJournal.addEntry(persistenceId, 5, entry2);
424             // 2 entries are applied to state besides the 4 entries in snapshot
425             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
426             InMemoryJournal.addEntry(persistenceId, 7, entry3);
427             InMemoryJournal.addEntry(persistenceId, 8, entry4);
428
429             // kill the actor
430             followerActor.tell(PoisonPill.getInstance(), null);
431             expectMsgClass(duration("5 seconds"), Terminated.class);
432
433             unwatch(followerActor);
434
435             //reinstate the actor
436             TestActorRef<MockRaftActor> ref = factory.createTestActor(
437                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
438                             Optional.<ConfigParams>of(config)));
439
440             ref.underlyingActor().waitForRecoveryComplete();
441
442             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
443             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
444                     context.getReplicatedLog().size());
445             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
446             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
447             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
448             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
449         }};
450     }
451
452     @Test
453     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
454         new JavaTestKit(getSystem()) {{
455             String persistenceId = factory.generateActorId("leader-");
456
457             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
458             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
459
460             // Setup the persisted journal with some entries
461             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
462                     new MockRaftActorContext.MockPayload("zero"));
463             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
464                     new MockRaftActorContext.MockPayload("oen"));
465             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
466                     new MockRaftActorContext.MockPayload("two"));
467
468             long seqNr = 1;
469             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
470             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
471             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
472             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
473
474             int lastAppliedToState = 1;
475             int lastIndex = 2;
476
477             //reinstate the actor
478             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
479                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
480                             Optional.<ConfigParams>of(config)));
481
482             leaderActor.underlyingActor().waitForRecoveryComplete();
483
484             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
485             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
486             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
487             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
488             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
489         }};
490     }
491
492     /**
493      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
494      * process recovery messages
495      *
496      * @throws Exception
497      */
498
499     @Test
500     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
501         new JavaTestKit(getSystem()) {
502             {
503                 String persistenceId = factory.generateActorId("leader-");
504
505                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
506
507                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
508
509                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
510                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
511
512                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
513
514                 // Wait for akka's recovery to complete so it doesn't interfere.
515                 mockRaftActor.waitForRecoveryComplete();
516
517                 ByteString snapshotBytes = fromObject(Arrays.asList(
518                         new MockRaftActorContext.MockPayload("A"),
519                         new MockRaftActorContext.MockPayload("B"),
520                         new MockRaftActorContext.MockPayload("C"),
521                         new MockRaftActorContext.MockPayload("D")));
522
523                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
524                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
525
526                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
527
528                 verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
529
530                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
531
532                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
533
534                 assertEquals("add replicated log entry", 1, replicatedLog.size());
535
536                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
537
538                 assertEquals("add replicated log entry", 2, replicatedLog.size());
539
540                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
541
542                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
543
544                 // The snapshot had 4 items + we added 2 more items during the test
545                 // We start removing from 5 and we should get 1 item in the replicated log
546                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
547
548                 assertEquals("remove log entries", 1, replicatedLog.size());
549
550                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
551
552                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
553                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
554
555                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
556
557             }};
558     }
559
560     /**
561      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
562      * not process recovery messages
563      *
564      * @throws Exception
565      */
566     @Test
567     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
568         new JavaTestKit(getSystem()) {
569             {
570                 String persistenceId = factory.generateActorId("leader-");
571
572                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
573
574                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
575
576                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
577                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
578
579                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
580
581                 // Wait for akka's recovery to complete so it doesn't interfere.
582                 mockRaftActor.waitForRecoveryComplete();
583
584                 ByteString snapshotBytes = fromObject(Arrays.asList(
585                         new MockRaftActorContext.MockPayload("A"),
586                         new MockRaftActorContext.MockPayload("B"),
587                         new MockRaftActorContext.MockPayload("C"),
588                         new MockRaftActorContext.MockPayload("D")));
589
590                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
591                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
592
593                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
594
595                 verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
596
597                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
598
599                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
600
601                 assertEquals("add replicated log entry", 0, replicatedLog.size());
602
603                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
604
605                 assertEquals("add replicated log entry", 0, replicatedLog.size());
606
607                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
608
609                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
610
611                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
612
613                 assertEquals("remove log entries", 0, replicatedLog.size());
614
615                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
616
617                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
618                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
619
620                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
621             }};
622     }
623
624
625     @Test
626     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
627         new JavaTestKit(getSystem()) {
628             {
629                 String persistenceId = factory.generateActorId("leader-");
630
631                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
632
633                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
634
635                 CountDownLatch persistLatch = new CountDownLatch(1);
636                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
637                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
638
639                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
640                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
641
642                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
643
644                 mockRaftActor.waitForInitializeBehaviorComplete();
645
646                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
647
648                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
649             }
650         };
651     }
652
653     @Test
654     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
655         new JavaTestKit(getSystem()) {
656             {
657                 String persistenceId = factory.generateActorId("leader-");
658
659                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
660
661                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
662
663                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
664
665                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
666                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
667
668                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
669
670                 mockRaftActor.waitForInitializeBehaviorComplete();
671
672                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
673
674                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
675
676                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
677             }
678         };
679     }
680
681     @Test
682     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
683         new JavaTestKit(getSystem()) {
684             {
685                 String persistenceId = factory.generateActorId("leader-");
686
687                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
688
689                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
690
691                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
692
693                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
694                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
695
696                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
697
698                 mockRaftActor.waitForInitializeBehaviorComplete();
699
700                 mockRaftActor.waitUntilLeader();
701
702                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
703
704                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
705
706                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
707             }
708         };
709     }
710
711     @Test
712     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
713         new JavaTestKit(getSystem()) {
714             {
715                 String persistenceId = factory.generateActorId("leader-");
716
717                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
718
719                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
720
721                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
722
723                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
724                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
725
726                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
727
728                 mockRaftActor.waitForInitializeBehaviorComplete();
729
730                 mockRaftActor.waitUntilLeader();
731
732                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
733
734                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
735
736             }
737
738         };
739     }
740
741     @Test
742     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
743         new JavaTestKit(getSystem()) {
744             {
745                 String persistenceId = factory.generateActorId("leader-");
746
747                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
748
749                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
750
751                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
752
753                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
754                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
755                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
756
757                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
758
759                 mockRaftActor.waitForInitializeBehaviorComplete();
760
761                 ByteString snapshotBytes = fromObject(Arrays.asList(
762                         new MockRaftActorContext.MockPayload("A"),
763                         new MockRaftActorContext.MockPayload("B"),
764                         new MockRaftActorContext.MockPayload("C"),
765                         new MockRaftActorContext.MockPayload("D")));
766
767                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
768
769                 raftActorContext.getSnapshotManager().capture(
770                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
771                                 new MockRaftActorContext.MockPayload("D")), -1);
772
773                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
774
775                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
776
777                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
778
779             }
780         };
781     }
782
783     @Test
784     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
785         new JavaTestKit(getSystem()) {
786             {
787                 String persistenceId = factory.generateActorId("leader-");
788
789                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
790
791                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
792
793                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
794
795                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
796                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
797
798                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
799
800                 mockRaftActor.waitForInitializeBehaviorComplete();
801                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
802
803                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
804                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
805                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
806                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
807                 mockRaftActor.getReplicatedLog().append(lastEntry);
808
809                 ByteString snapshotBytes = fromObject(Arrays.asList(
810                         new MockRaftActorContext.MockPayload("A"),
811                         new MockRaftActorContext.MockPayload("B"),
812                         new MockRaftActorContext.MockPayload("C"),
813                         new MockRaftActorContext.MockPayload("D")));
814
815                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
816                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
817
818                 long replicatedToAllIndex = 1;
819
820                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
821
822                 verify(mockRaftActor.actorDelegate).createSnapshot();
823
824                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
825
826                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
827
828                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
829
830                 verify(dataPersistenceProvider).deleteMessages(100);
831
832                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
833                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
834
835                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
836                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
837                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
838
839                 // Index 2 will not be in the log because it was removed due to snapshotting
840                 assertNull(mockRaftActor.getReplicatedLog().get(1));
841                 assertNull(mockRaftActor.getReplicatedLog().get(0));
842
843             }
844         };
845     }
846
847     @Test
848     public void testApplyState() throws Exception {
849
850         new JavaTestKit(getSystem()) {
851             {
852                 String persistenceId = factory.generateActorId("leader-");
853
854                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
855
856                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
857
858                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
859
860                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
861                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
862
863                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
864
865                 mockRaftActor.waitForInitializeBehaviorComplete();
866
867                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
868                         new MockRaftActorContext.MockPayload("F"));
869
870                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
871
872                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
873
874             }
875         };
876     }
877
878     @Test
879     public void testApplySnapshot() throws Exception {
880         new JavaTestKit(getSystem()) {
881             {
882                 String persistenceId = factory.generateActorId("leader-");
883
884                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
885
886                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
887
888                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
889
890                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
891                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
892
893                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
894
895                 mockRaftActor.waitForInitializeBehaviorComplete();
896
897                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
898
899                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
900                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
901                 oldReplicatedLog.append(
902                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
903                                 mock(Payload.class)));
904
905                 ByteString snapshotBytes = fromObject(Arrays.asList(
906                         new MockRaftActorContext.MockPayload("A"),
907                         new MockRaftActorContext.MockPayload("B"),
908                         new MockRaftActorContext.MockPayload("C"),
909                         new MockRaftActorContext.MockPayload("D")));
910
911                 Snapshot snapshot = mock(Snapshot.class);
912
913                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
914
915                 doReturn(3L).when(snapshot).getLastAppliedIndex();
916
917                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
918
919                 verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
920
921                 assertTrue("The replicatedLog should have changed",
922                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
923
924                 assertEquals("lastApplied should be same as in the snapshot",
925                         (Long) 3L, mockRaftActor.getLastApplied());
926
927                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
928
929             }
930         };
931     }
932
933     @Test
934     public void testSaveSnapshotFailure() throws Exception {
935         new JavaTestKit(getSystem()) {
936             {
937                 String persistenceId = factory.generateActorId("leader-");
938
939                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
940
941                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
942
943                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
944
945                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
946                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
947
948                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
949
950                 mockRaftActor.waitForInitializeBehaviorComplete();
951
952                 ByteString snapshotBytes = fromObject(Arrays.asList(
953                         new MockRaftActorContext.MockPayload("A"),
954                         new MockRaftActorContext.MockPayload("B"),
955                         new MockRaftActorContext.MockPayload("C"),
956                         new MockRaftActorContext.MockPayload("D")));
957
958                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
959
960                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
961
962                 raftActorContext.getSnapshotManager().capture(
963                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
964                                 new MockRaftActorContext.MockPayload("D")), 1);
965
966                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
967
968                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
969                         new Exception()));
970
971                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
972                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
973
974             }
975         };
976     }
977
978     @Test
979     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
980         new JavaTestKit(getSystem()) {{
981             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
982                     Props.create(MessageCollectorActor.class));
983             MessageCollectorActor.waitUntilReady(notifierActor);
984
985             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
986             long heartBeatInterval = 100;
987             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
988             config.setElectionTimeoutFactor(20);
989
990             String persistenceId = factory.generateActorId("notifier-");
991
992             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
993                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
994                     new NonPersistentDataProvider()), persistenceId);
995
996             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
997
998
999             // check if the notifier got a role change from null to Follower
1000             RoleChanged raftRoleChanged = matches.get(0);
1001             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1002             assertNull(raftRoleChanged.getOldRole());
1003             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1004
1005             // check if the notifier got a role change from Follower to Candidate
1006             raftRoleChanged = matches.get(1);
1007             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1008             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1009             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1010
1011             // check if the notifier got a role change from Candidate to Leader
1012             raftRoleChanged = matches.get(2);
1013             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1014             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1015             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1016
1017             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1018                     notifierActor, LeaderStateChanged.class);
1019
1020             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1021
1022             notifierActor.underlyingActor().clear();
1023
1024             MockRaftActor raftActor = raftActorRef.underlyingActor();
1025             final String newLeaderId = "new-leader";
1026             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1027                 @Override
1028                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1029                     leaderId = newLeaderId;
1030                     return this;
1031                 }
1032             };
1033
1034             raftActor.changeCurrentBehavior(follower);
1035
1036             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1037             assertEquals(persistenceId, leaderStateChange.getMemberId());
1038             assertEquals(null, leaderStateChange.getLeaderId());
1039
1040             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1041             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1042             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1043
1044             notifierActor.underlyingActor().clear();
1045
1046             raftActor.handleCommand("any");
1047
1048             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1049             assertEquals(persistenceId, leaderStateChange.getMemberId());
1050             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1051         }};
1052     }
1053
1054     @Test
1055     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1056         new JavaTestKit(getSystem()) {{
1057             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1058             MessageCollectorActor.waitUntilReady(notifierActor);
1059
1060             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1061             long heartBeatInterval = 100;
1062             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1063             config.setElectionTimeoutFactor(1);
1064
1065             String persistenceId = factory.generateActorId("notifier-");
1066
1067             factory.createActor(MockRaftActor.props(persistenceId,
1068                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1069
1070             List<RoleChanged> matches =  null;
1071             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1072                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1073                 assertNotNull(matches);
1074                 if(matches.size() == 3) {
1075                     break;
1076                 }
1077                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1078             }
1079
1080             assertEquals(2, matches.size());
1081
1082             // check if the notifier got a role change from null to Follower
1083             RoleChanged raftRoleChanged = matches.get(0);
1084             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1085             assertNull(raftRoleChanged.getOldRole());
1086             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1087
1088             // check if the notifier got a role change from Follower to Candidate
1089             raftRoleChanged = matches.get(1);
1090             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1091             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1092             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1093
1094         }};
1095     }
1096
1097     @Test
1098     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1099         new JavaTestKit(getSystem()) {
1100             {
1101                 String persistenceId = factory.generateActorId("leader-");
1102                 String follower1Id = factory.generateActorId("follower-");
1103
1104                 ActorRef followerActor1 =
1105                         factory.createActor(Props.create(MessageCollectorActor.class));
1106
1107                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1108                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1109                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1110
1111                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1112
1113                 Map<String, String> peerAddresses = new HashMap<>();
1114                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1115
1116                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1117                         MockRaftActor.props(persistenceId, peerAddresses,
1118                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1119
1120                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1121
1122                 leaderActor.getRaftActorContext().setCommitIndex(4);
1123                 leaderActor.getRaftActorContext().setLastApplied(4);
1124                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1125
1126                 leaderActor.waitForInitializeBehaviorComplete();
1127
1128                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1129
1130                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1131                 leaderActor.setCurrentBehavior(leader);
1132                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1133
1134                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1135                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1136
1137                 assertEquals(8, leaderActor.getReplicatedLog().size());
1138
1139                 leaderActor.getRaftActorContext().getSnapshotManager()
1140                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1141                                 new MockRaftActorContext.MockPayload("x")), 4);
1142
1143                 verify(leaderActor.actorDelegate).createSnapshot();
1144
1145                 assertEquals(8, leaderActor.getReplicatedLog().size());
1146
1147                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1148                 //fake snapshot on index 5
1149                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1150
1151                 assertEquals(8, leaderActor.getReplicatedLog().size());
1152
1153                 //fake snapshot on index 6
1154                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1155                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1156                 assertEquals(8, leaderActor.getReplicatedLog().size());
1157
1158                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1159
1160                 assertEquals(8, leaderActor.getReplicatedLog().size());
1161
1162                 ByteString snapshotBytes = fromObject(Arrays.asList(
1163                         new MockRaftActorContext.MockPayload("foo-0"),
1164                         new MockRaftActorContext.MockPayload("foo-1"),
1165                         new MockRaftActorContext.MockPayload("foo-2"),
1166                         new MockRaftActorContext.MockPayload("foo-3"),
1167                         new MockRaftActorContext.MockPayload("foo-4")));
1168
1169                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1170                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1171
1172                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1173
1174                 // The commit is needed to complete the snapshot creation process
1175                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1176
1177                 // capture snapshot reply should remove the snapshotted entries only
1178                 assertEquals(3, leaderActor.getReplicatedLog().size());
1179                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1180
1181                 // add another non-replicated entry
1182                 leaderActor.getReplicatedLog().append(
1183                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1184
1185                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1186                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1187                 assertEquals(2, leaderActor.getReplicatedLog().size());
1188                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1189
1190             }
1191         };
1192     }
1193
1194     @Test
1195     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1196         new JavaTestKit(getSystem()) {
1197             {
1198                 String persistenceId = factory.generateActorId("follower-");
1199                 String leaderId = factory.generateActorId("leader-");
1200
1201
1202                 ActorRef leaderActor1 =
1203                         factory.createActor(Props.create(MessageCollectorActor.class));
1204
1205                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1206                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1207                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1208
1209                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1210
1211                 Map<String, String> peerAddresses = new HashMap<>();
1212                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1213
1214                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1215                         MockRaftActor.props(persistenceId, peerAddresses,
1216                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1217
1218                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1219                 followerActor.getRaftActorContext().setCommitIndex(4);
1220                 followerActor.getRaftActorContext().setLastApplied(4);
1221                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1222
1223                 followerActor.waitForInitializeBehaviorComplete();
1224
1225
1226                 Follower follower = new Follower(followerActor.getRaftActorContext());
1227                 followerActor.setCurrentBehavior(follower);
1228                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1229
1230                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1231                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1232                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1233
1234                 // log has indices 0-5
1235                 assertEquals(6, followerActor.getReplicatedLog().size());
1236
1237                 //snapshot on 4
1238                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1239                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1240                                 new MockRaftActorContext.MockPayload("D")), 4);
1241
1242                 verify(followerActor.actorDelegate).createSnapshot();
1243
1244                 assertEquals(6, followerActor.getReplicatedLog().size());
1245
1246                 //fake snapshot on index 6
1247                 List<ReplicatedLogEntry> entries =
1248                         Arrays.asList(
1249                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1250                                         new MockRaftActorContext.MockPayload("foo-6"))
1251                         );
1252                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1253                 assertEquals(7, followerActor.getReplicatedLog().size());
1254
1255                 //fake snapshot on index 7
1256                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1257
1258                 entries =
1259                         Arrays.asList(
1260                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1261                                         new MockRaftActorContext.MockPayload("foo-7"))
1262                         );
1263                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1264                 assertEquals(8, followerActor.getReplicatedLog().size());
1265
1266                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1267
1268
1269                 ByteString snapshotBytes = fromObject(Arrays.asList(
1270                         new MockRaftActorContext.MockPayload("foo-0"),
1271                         new MockRaftActorContext.MockPayload("foo-1"),
1272                         new MockRaftActorContext.MockPayload("foo-2"),
1273                         new MockRaftActorContext.MockPayload("foo-3"),
1274                         new MockRaftActorContext.MockPayload("foo-4")));
1275                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1276                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1277
1278                 // The commit is needed to complete the snapshot creation process
1279                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1280
1281                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1282                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1283                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1284
1285                 entries =
1286                         Arrays.asList(
1287                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1288                                         new MockRaftActorContext.MockPayload("foo-7"))
1289                         );
1290                 // send an additional entry 8 with leaderCommit = 7
1291                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1292
1293                 // 7 and 8, as lastapplied is 7
1294                 assertEquals(2, followerActor.getReplicatedLog().size());
1295
1296             }
1297         };
1298     }
1299
1300     @Test
1301     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1302         new JavaTestKit(getSystem()) {
1303             {
1304                 String persistenceId = factory.generateActorId("leader-");
1305                 String follower1Id = factory.generateActorId("follower-");
1306                 String follower2Id = factory.generateActorId("follower-");
1307
1308                 ActorRef followerActor1 =
1309                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1310                 ActorRef followerActor2 =
1311                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1312
1313                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1314                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1315                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1316
1317                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1318
1319                 Map<String, String> peerAddresses = new HashMap<>();
1320                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1321                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1322
1323                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1324                         MockRaftActor.props(persistenceId, peerAddresses,
1325                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1326
1327                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1328                 leaderActor.getRaftActorContext().setCommitIndex(9);
1329                 leaderActor.getRaftActorContext().setLastApplied(9);
1330                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1331
1332                 leaderActor.waitForInitializeBehaviorComplete();
1333
1334                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1335                 leaderActor.setCurrentBehavior(leader);
1336                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1337
1338                 // create 5 entries in the log
1339                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1340                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1341
1342                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1343                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1344                 //setting replicatedToAllIndex = 9, for the log to clear
1345                 leader.setReplicatedToAllIndex(9);
1346                 assertEquals(5, leaderActor.getReplicatedLog().size());
1347                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1348
1349                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1350                 assertEquals(5, leaderActor.getReplicatedLog().size());
1351                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1352
1353                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1354                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1355                 assertEquals(5, leaderActor.getReplicatedLog().size());
1356                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1357
1358                 // simulate a real snapshot
1359                 leaderActor.onReceiveCommand(new SendHeartBeat());
1360                 assertEquals(5, leaderActor.getReplicatedLog().size());
1361                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1362                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1363                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1364
1365
1366                 //reply from a slow follower does not initiate a fake snapshot
1367                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1368                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1369
1370                 ByteString snapshotBytes = fromObject(Arrays.asList(
1371                         new MockRaftActorContext.MockPayload("foo-0"),
1372                         new MockRaftActorContext.MockPayload("foo-1"),
1373                         new MockRaftActorContext.MockPayload("foo-2"),
1374                         new MockRaftActorContext.MockPayload("foo-3"),
1375                         new MockRaftActorContext.MockPayload("foo-4")));
1376                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1377                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1378
1379                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1380
1381                 //reply from a slow follower after should not raise errors
1382                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1383                 assertEquals(0, leaderActor.getReplicatedLog().size());
1384             }
1385         };
1386     }
1387
1388     @Test
1389     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1390         new JavaTestKit(getSystem()) {{
1391             String persistenceId = factory.generateActorId("leader-");
1392             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1393             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1394             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1395             config.setSnapshotBatchCount(5);
1396
1397             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1398
1399             Map<String, String> peerAddresses = new HashMap<>();
1400
1401             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1402                     MockRaftActor.props(persistenceId, peerAddresses,
1403                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1404
1405             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1406             leaderActor.getRaftActorContext().setCommitIndex(3);
1407             leaderActor.getRaftActorContext().setLastApplied(3);
1408             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1409
1410             leaderActor.waitForInitializeBehaviorComplete();
1411             for(int i=0;i< 4;i++) {
1412                 leaderActor.getReplicatedLog()
1413                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1414                                 new MockRaftActorContext.MockPayload("A")));
1415             }
1416
1417             Leader leader = new Leader(leaderActor.getRaftActorContext());
1418             leaderActor.setCurrentBehavior(leader);
1419             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1420
1421             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1422             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1423
1424             // Now send a CaptureSnapshotReply
1425             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1426
1427             // Trimming log in this scenario is a no-op
1428             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1429             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1430             assertEquals(-1, leader.getReplicatedToAllIndex());
1431
1432         }};
1433     }
1434
1435     @Test
1436     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1437         new JavaTestKit(getSystem()) {{
1438             String persistenceId = factory.generateActorId("leader-");
1439             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1440             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1441             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1442             config.setSnapshotBatchCount(5);
1443
1444             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1445
1446             Map<String, String> peerAddresses = new HashMap<>();
1447
1448             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1449                     MockRaftActor.props(persistenceId, peerAddresses,
1450                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1451
1452             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1453             leaderActor.getRaftActorContext().setCommitIndex(3);
1454             leaderActor.getRaftActorContext().setLastApplied(3);
1455             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1456             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1457
1458             leaderActor.waitForInitializeBehaviorComplete();
1459             Leader leader = new Leader(leaderActor.getRaftActorContext());
1460             leaderActor.setCurrentBehavior(leader);
1461             leader.setReplicatedToAllIndex(3);
1462             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1463
1464             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1465             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1466
1467             // Now send a CaptureSnapshotReply
1468             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1469
1470             // Trimming log in this scenario is a no-op
1471             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1472             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1473             assertEquals(3, leader.getReplicatedToAllIndex());
1474
1475         }};
1476     }
1477
1478     private ByteString fromObject(Object snapshot) throws Exception {
1479         ByteArrayOutputStream b = null;
1480         ObjectOutputStream o = null;
1481         try {
1482             b = new ByteArrayOutputStream();
1483             o = new ObjectOutputStream(b);
1484             o.writeObject(snapshot);
1485             byte[] snapshotBytes = b.toByteArray();
1486             return ByteString.copyFrom(snapshotBytes);
1487         } finally {
1488             if (o != null) {
1489                 o.flush();
1490                 o.close();
1491             }
1492             if (b != null) {
1493                 b.close();
1494             }
1495         }
1496     }
1497
1498 }