Calculate replicated log data size on recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import org.junit.After;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.opendaylight.controller.cluster.DataPersistenceProvider;
58 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
59 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
60 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
61 import org.opendaylight.controller.cluster.notifications.RoleChanged;
62 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
67 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
69 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
70 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
71 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
73 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
75 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.Await;
81 import scala.concurrent.Future;
82 import scala.concurrent.duration.Duration;
83 import scala.concurrent.duration.FiniteDuration;
84
85 public class RaftActorTest extends AbstractActorTest {
86
87     private TestActorFactory factory;
88
89     @Before
90     public void setUp(){
91         factory = new TestActorFactory(getSystem());
92     }
93
94     @After
95     public void tearDown() throws Exception {
96         factory.close();
97         InMemoryJournal.clear();
98         InMemorySnapshotStore.clear();
99     }
100
101     public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
102
103         private final RaftActor actorDelegate;
104         private final RaftActorRecoveryCohort recoveryCohortDelegate;
105         private final RaftActorSnapshotCohort snapshotCohortDelegate;
106         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
107         private final List<Object> state;
108         private ActorRef roleChangeNotifier;
109         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
110
111         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
112             private static final long serialVersionUID = 1L;
113             private final Map<String, String> peerAddresses;
114             private final String id;
115             private final Optional<ConfigParams> config;
116             private final DataPersistenceProvider dataPersistenceProvider;
117             private final ActorRef roleChangeNotifier;
118
119             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
120                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
121                 ActorRef roleChangeNotifier) {
122                 this.peerAddresses = peerAddresses;
123                 this.id = id;
124                 this.config = config;
125                 this.dataPersistenceProvider = dataPersistenceProvider;
126                 this.roleChangeNotifier = roleChangeNotifier;
127             }
128
129             @Override
130             public MockRaftActor create() throws Exception {
131                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
132                     dataPersistenceProvider);
133                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
134                 return mockRaftActor;
135             }
136         }
137
138         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
139                              DataPersistenceProvider dataPersistenceProvider) {
140             super(id, peerAddresses, config);
141             state = new ArrayList<>();
142             this.actorDelegate = mock(RaftActor.class);
143             this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
144             this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
145             if(dataPersistenceProvider == null){
146                 setPersistence(true);
147             } else {
148                 setPersistence(dataPersistenceProvider);
149             }
150         }
151
152         public void waitForRecoveryComplete() {
153             try {
154                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
155             } catch (InterruptedException e) {
156                 e.printStackTrace();
157             }
158         }
159
160         public void waitForInitializeBehaviorComplete() {
161             try {
162                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
163             } catch (InterruptedException e) {
164                 e.printStackTrace();
165             }
166         }
167
168
169         public void waitUntilLeader(){
170             for(int i = 0;i < 10; i++){
171                 if(isLeader()){
172                     break;
173                 }
174                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
175             }
176         }
177
178         public List<Object> getState() {
179             return state;
180         }
181
182         public static Props props(final String id, final Map<String, String> peerAddresses,
183                 Optional<ConfigParams> config){
184             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
185         }
186
187         public static Props props(final String id, final Map<String, String> peerAddresses,
188                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
189             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
190         }
191
192         public static Props props(final String id, final Map<String, String> peerAddresses,
193             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
194             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
195         }
196
197         public static Props props(final String id, final Map<String, String> peerAddresses,
198                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
199                                   DataPersistenceProvider dataPersistenceProvider){
200             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
201         }
202
203
204         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
205             actorDelegate.applyState(clientActor, identifier, data);
206             LOG.info("{}: applyState called", persistenceId());
207         }
208
209         @Override
210         @Nonnull
211         protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
212             return this;
213         }
214
215         @Override
216         protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
217             return this;
218         }
219
220         @Override
221         public void startLogRecoveryBatch(int maxBatchSize) {
222         }
223
224         @Override
225         public void appendRecoveredLogEntry(Payload data) {
226             state.add(data);
227         }
228
229         @Override
230         public void applyCurrentLogRecoveryBatch() {
231         }
232
233         @Override
234         protected void onRecoveryComplete() {
235             actorDelegate.onRecoveryComplete();
236             recoveryComplete.countDown();
237         }
238
239         @Override
240         protected void initializeBehavior() {
241             super.initializeBehavior();
242             initializeBehaviorComplete.countDown();
243         }
244
245         @Override
246         public void applyRecoverySnapshot(byte[] bytes) {
247             recoveryCohortDelegate.applyRecoverySnapshot(bytes);
248             try {
249                 Object data = toObject(bytes);
250                 if (data instanceof List) {
251                     state.addAll((List<?>) data);
252                 }
253             } catch (Exception e) {
254                 e.printStackTrace();
255             }
256         }
257
258         @Override
259         public void createSnapshot(ActorRef actorRef) {
260             LOG.info("{}: createSnapshot called", persistenceId());
261             snapshotCohortDelegate.createSnapshot(actorRef);
262         }
263
264         @Override
265         public void applySnapshot(byte [] snapshot) {
266             LOG.info("{}: applySnapshot called", persistenceId());
267             snapshotCohortDelegate.applySnapshot(snapshot);
268         }
269
270         @Override
271         protected void onStateChanged() {
272             actorDelegate.onStateChanged();
273         }
274
275         @Override
276         protected Optional<ActorRef> getRoleChangeNotifier() {
277             return Optional.fromNullable(roleChangeNotifier);
278         }
279
280         @Override public String persistenceId() {
281             return this.getId();
282         }
283
284         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
285             Object obj = null;
286             ByteArrayInputStream bis = null;
287             ObjectInputStream ois = null;
288             try {
289                 bis = new ByteArrayInputStream(bs);
290                 ois = new ObjectInputStream(bis);
291                 obj = ois.readObject();
292             } finally {
293                 if (bis != null) {
294                     bis.close();
295                 }
296                 if (ois != null) {
297                     ois.close();
298                 }
299             }
300             return obj;
301         }
302
303         public ReplicatedLog getReplicatedLog(){
304             return this.getRaftActorContext().getReplicatedLog();
305         }
306     }
307
308
309     public static class RaftActorTestKit extends JavaTestKit {
310         private final ActorRef raftActor;
311
312         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
313             super(actorSystem);
314
315             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
316                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
317
318         }
319
320
321         public ActorRef getRaftActor() {
322             return raftActor;
323         }
324
325         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
326             // Wait for a specific log message to show up
327             return
328                 new JavaTestKit.EventFilter<Boolean>(logEventClass
329                 ) {
330                     @Override
331                     protected Boolean run() {
332                         return true;
333                     }
334                 }.from(raftActor.path().toString())
335                     .message(message)
336                     .occurrences(1).exec();
337
338
339         }
340
341         protected void waitUntilLeader(){
342             waitUntilLeader(raftActor);
343         }
344
345         public static void waitUntilLeader(ActorRef actorRef) {
346             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
347             for(int i = 0; i < 20 * 5; i++) {
348                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
349                 try {
350                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
351                     if(resp.getLeaderActor() != null) {
352                         return;
353                     }
354                 } catch(TimeoutException e) {
355                 } catch(Exception e) {
356                     System.err.println("FindLeader threw ex");
357                     e.printStackTrace();
358                 }
359
360
361                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
362             }
363
364             Assert.fail("Leader not found for actorRef " + actorRef.path());
365         }
366
367     }
368
369
370     @Test
371     public void testConstruction() {
372         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
373     }
374
375     @Test
376     public void testFindLeaderWhenLeaderIsSelf(){
377         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
378         kit.waitUntilLeader();
379     }
380
381     @Test
382     public void testRaftActorRecovery() throws Exception {
383         new JavaTestKit(getSystem()) {{
384             String persistenceId = factory.generateActorId("follower-");
385
386             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
387             // Set the heartbeat interval high to essentially disable election otherwise the test
388             // may fail if the actor is switched to Leader and the commitIndex is set to the last
389             // log entry.
390             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
391
392             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
393                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
394
395             watch(followerActor);
396
397             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
398             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
399                     new MockRaftActorContext.MockPayload("E"));
400             snapshotUnappliedEntries.add(entry1);
401
402             int lastAppliedDuringSnapshotCapture = 3;
403             int lastIndexDuringSnapshotCapture = 4;
404
405             // 4 messages as part of snapshot, which are applied to state
406             ByteString snapshotBytes = fromObject(Arrays.asList(
407                     new MockRaftActorContext.MockPayload("A"),
408                     new MockRaftActorContext.MockPayload("B"),
409                     new MockRaftActorContext.MockPayload("C"),
410                     new MockRaftActorContext.MockPayload("D")));
411
412             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
413                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
414                     lastAppliedDuringSnapshotCapture, 1);
415             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
416
417             // add more entries after snapshot is taken
418             List<ReplicatedLogEntry> entries = new ArrayList<>();
419             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
420                     new MockRaftActorContext.MockPayload("F", 2));
421             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
422                     new MockRaftActorContext.MockPayload("G", 3));
423             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
424                     new MockRaftActorContext.MockPayload("H", 4));
425             entries.add(entry2);
426             entries.add(entry3);
427             entries.add(entry4);
428
429             int lastAppliedToState = 5;
430             int lastIndex = 7;
431
432             InMemoryJournal.addEntry(persistenceId, 5, entry2);
433             // 2 entries are applied to state besides the 4 entries in snapshot
434             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
435             InMemoryJournal.addEntry(persistenceId, 7, entry3);
436             InMemoryJournal.addEntry(persistenceId, 8, entry4);
437
438             // kill the actor
439             followerActor.tell(PoisonPill.getInstance(), null);
440             expectMsgClass(duration("5 seconds"), Terminated.class);
441
442             unwatch(followerActor);
443
444             //reinstate the actor
445             TestActorRef<MockRaftActor> ref = factory.createTestActor(
446                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
447                             Optional.<ConfigParams>of(config)));
448
449             ref.underlyingActor().waitForRecoveryComplete();
450
451             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
452             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
453                     context.getReplicatedLog().size());
454             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
455             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
456             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
457             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
458             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
459         }};
460     }
461
462     @Test
463     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
464         new JavaTestKit(getSystem()) {{
465             String persistenceId = factory.generateActorId("leader-");
466
467             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
468             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
469
470             // Setup the persisted journal with some entries
471             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
472                     new MockRaftActorContext.MockPayload("zero"));
473             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
474                     new MockRaftActorContext.MockPayload("oen"));
475             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
476                     new MockRaftActorContext.MockPayload("two"));
477
478             long seqNr = 1;
479             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
480             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
481             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
482             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
483
484             int lastAppliedToState = 1;
485             int lastIndex = 2;
486
487             //reinstate the actor
488             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
489                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
490                             Optional.<ConfigParams>of(config)));
491
492             leaderActor.underlyingActor().waitForRecoveryComplete();
493
494             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
495             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
496             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
497             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
498             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
499         }};
500     }
501
502     /**
503      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
504      * process recovery messages
505      *
506      * @throws Exception
507      */
508
509     @Test
510     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
511         new JavaTestKit(getSystem()) {
512             {
513                 String persistenceId = factory.generateActorId("leader-");
514
515                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
516
517                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
518
519                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
520                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
521
522                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
523
524                 // Wait for akka's recovery to complete so it doesn't interfere.
525                 mockRaftActor.waitForRecoveryComplete();
526
527                 ByteString snapshotBytes = fromObject(Arrays.asList(
528                         new MockRaftActorContext.MockPayload("A"),
529                         new MockRaftActorContext.MockPayload("B"),
530                         new MockRaftActorContext.MockPayload("C"),
531                         new MockRaftActorContext.MockPayload("D")));
532
533                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
534                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
535
536                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
537
538                 verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
539
540                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
541
542                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
543
544                 assertEquals("add replicated log entry", 1, replicatedLog.size());
545
546                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
547
548                 assertEquals("add replicated log entry", 2, replicatedLog.size());
549
550                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
551
552                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
553
554                 // The snapshot had 4 items + we added 2 more items during the test
555                 // We start removing from 5 and we should get 1 item in the replicated log
556                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
557
558                 assertEquals("remove log entries", 1, replicatedLog.size());
559
560                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
561
562                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
563                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
564
565                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
566
567             }};
568     }
569
570     /**
571      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
572      * not process recovery messages
573      *
574      * @throws Exception
575      */
576     @Test
577     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
578         new JavaTestKit(getSystem()) {
579             {
580                 String persistenceId = factory.generateActorId("leader-");
581
582                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
583
584                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
585
586                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
587                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
588
589                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
590
591                 // Wait for akka's recovery to complete so it doesn't interfere.
592                 mockRaftActor.waitForRecoveryComplete();
593
594                 ByteString snapshotBytes = fromObject(Arrays.asList(
595                         new MockRaftActorContext.MockPayload("A"),
596                         new MockRaftActorContext.MockPayload("B"),
597                         new MockRaftActorContext.MockPayload("C"),
598                         new MockRaftActorContext.MockPayload("D")));
599
600                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
601                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
602
603                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
604
605                 verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
606
607                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
608
609                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
610
611                 assertEquals("add replicated log entry", 0, replicatedLog.size());
612
613                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
614
615                 assertEquals("add replicated log entry", 0, replicatedLog.size());
616
617                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
618
619                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
620
621                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
622
623                 assertEquals("remove log entries", 0, replicatedLog.size());
624
625                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
626
627                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
628                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
629
630                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
631             }};
632     }
633
634
635     @Test
636     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
637         new JavaTestKit(getSystem()) {
638             {
639                 String persistenceId = factory.generateActorId("leader-");
640
641                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
642
643                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
644
645                 CountDownLatch persistLatch = new CountDownLatch(1);
646                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
647                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
648
649                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
650                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
651
652                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
653
654                 mockRaftActor.waitForInitializeBehaviorComplete();
655
656                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
657
658                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
659             }
660         };
661     }
662
663     @Test
664     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
665         new JavaTestKit(getSystem()) {
666             {
667                 String persistenceId = factory.generateActorId("leader-");
668
669                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
670
671                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
672
673                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
674
675                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
676                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
677
678                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
679
680                 mockRaftActor.waitForInitializeBehaviorComplete();
681
682                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
683
684                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
685
686                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
687             }
688         };
689     }
690
691     @Test
692     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
693         new JavaTestKit(getSystem()) {
694             {
695                 String persistenceId = factory.generateActorId("leader-");
696
697                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
698
699                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
700
701                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
702
703                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
704                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
705
706                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
707
708                 mockRaftActor.waitForInitializeBehaviorComplete();
709
710                 mockRaftActor.waitUntilLeader();
711
712                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
713
714                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
715
716                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
717             }
718         };
719     }
720
721     @Test
722     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
723         new JavaTestKit(getSystem()) {
724             {
725                 String persistenceId = factory.generateActorId("leader-");
726
727                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
728
729                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
730
731                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
732
733                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
734                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
735
736                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
737
738                 mockRaftActor.waitForInitializeBehaviorComplete();
739
740                 mockRaftActor.waitUntilLeader();
741
742                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
743
744                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
745
746             }
747
748         };
749     }
750
751     @Test
752     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
753         new JavaTestKit(getSystem()) {
754             {
755                 String persistenceId = factory.generateActorId("leader-");
756
757                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
758
759                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
760
761                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
762
763                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
764                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
765                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
766
767                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
768
769                 mockRaftActor.waitForInitializeBehaviorComplete();
770
771                 ByteString snapshotBytes = fromObject(Arrays.asList(
772                         new MockRaftActorContext.MockPayload("A"),
773                         new MockRaftActorContext.MockPayload("B"),
774                         new MockRaftActorContext.MockPayload("C"),
775                         new MockRaftActorContext.MockPayload("D")));
776
777                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
778
779                 raftActorContext.getSnapshotManager().capture(
780                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
781                                 new MockRaftActorContext.MockPayload("D")), -1);
782
783                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
784
785                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
786
787                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
788
789             }
790         };
791     }
792
793     @Test
794     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
795         new JavaTestKit(getSystem()) {
796             {
797                 String persistenceId = factory.generateActorId("leader-");
798
799                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
800
801                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
802
803                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
804
805                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
806                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
807
808                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
809
810                 mockRaftActor.waitForInitializeBehaviorComplete();
811                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
812
813                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
814                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
815                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
816                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
817                 mockRaftActor.getReplicatedLog().append(lastEntry);
818
819                 ByteString snapshotBytes = fromObject(Arrays.asList(
820                         new MockRaftActorContext.MockPayload("A"),
821                         new MockRaftActorContext.MockPayload("B"),
822                         new MockRaftActorContext.MockPayload("C"),
823                         new MockRaftActorContext.MockPayload("D")));
824
825                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
826                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
827
828                 long replicatedToAllIndex = 1;
829
830                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
831
832                 verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
833
834                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
835
836                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
837
838                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
839
840                 verify(dataPersistenceProvider).deleteMessages(100);
841
842                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
843                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
844
845                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
846                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
847                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
848
849                 // Index 2 will not be in the log because it was removed due to snapshotting
850                 assertNull(mockRaftActor.getReplicatedLog().get(1));
851                 assertNull(mockRaftActor.getReplicatedLog().get(0));
852
853             }
854         };
855     }
856
857     @Test
858     public void testApplyState() throws Exception {
859
860         new JavaTestKit(getSystem()) {
861             {
862                 String persistenceId = factory.generateActorId("leader-");
863
864                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
865
866                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
867
868                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
869
870                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
871                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
872
873                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
874
875                 mockRaftActor.waitForInitializeBehaviorComplete();
876
877                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
878                         new MockRaftActorContext.MockPayload("F"));
879
880                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
881
882                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
883
884             }
885         };
886     }
887
888     @Test
889     public void testApplySnapshot() throws Exception {
890         new JavaTestKit(getSystem()) {
891             {
892                 String persistenceId = factory.generateActorId("leader-");
893
894                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
895
896                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
897
898                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
899
900                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
901                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
902
903                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
904
905                 mockRaftActor.waitForInitializeBehaviorComplete();
906
907                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
908
909                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
910                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
911                 oldReplicatedLog.append(
912                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
913                                 mock(Payload.class)));
914
915                 ByteString snapshotBytes = fromObject(Arrays.asList(
916                         new MockRaftActorContext.MockPayload("A"),
917                         new MockRaftActorContext.MockPayload("B"),
918                         new MockRaftActorContext.MockPayload("C"),
919                         new MockRaftActorContext.MockPayload("D")));
920
921                 Snapshot snapshot = mock(Snapshot.class);
922
923                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
924
925                 doReturn(3L).when(snapshot).getLastAppliedIndex();
926
927                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
928
929                 verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
930
931                 assertTrue("The replicatedLog should have changed",
932                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
933
934                 assertEquals("lastApplied should be same as in the snapshot",
935                         (Long) 3L, mockRaftActor.getLastApplied());
936
937                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
938
939             }
940         };
941     }
942
943     @Test
944     public void testSaveSnapshotFailure() throws Exception {
945         new JavaTestKit(getSystem()) {
946             {
947                 String persistenceId = factory.generateActorId("leader-");
948
949                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
950
951                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
952
953                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
954
955                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
956                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
957
958                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
959
960                 mockRaftActor.waitForInitializeBehaviorComplete();
961
962                 ByteString snapshotBytes = fromObject(Arrays.asList(
963                         new MockRaftActorContext.MockPayload("A"),
964                         new MockRaftActorContext.MockPayload("B"),
965                         new MockRaftActorContext.MockPayload("C"),
966                         new MockRaftActorContext.MockPayload("D")));
967
968                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
969
970                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
971
972                 raftActorContext.getSnapshotManager().capture(
973                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
974                                 new MockRaftActorContext.MockPayload("D")), 1);
975
976                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
977
978                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
979                         new Exception()));
980
981                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
982                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
983
984             }
985         };
986     }
987
988     @Test
989     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
990         new JavaTestKit(getSystem()) {{
991             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
992                     Props.create(MessageCollectorActor.class));
993             MessageCollectorActor.waitUntilReady(notifierActor);
994
995             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
996             long heartBeatInterval = 100;
997             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
998             config.setElectionTimeoutFactor(20);
999
1000             String persistenceId = factory.generateActorId("notifier-");
1001
1002             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1003                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
1004                     new NonPersistentDataProvider()), persistenceId);
1005
1006             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
1007
1008
1009             // check if the notifier got a role change from null to Follower
1010             RoleChanged raftRoleChanged = matches.get(0);
1011             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1012             assertNull(raftRoleChanged.getOldRole());
1013             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1014
1015             // check if the notifier got a role change from Follower to Candidate
1016             raftRoleChanged = matches.get(1);
1017             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1018             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1019             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1020
1021             // check if the notifier got a role change from Candidate to Leader
1022             raftRoleChanged = matches.get(2);
1023             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1024             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1025             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1026
1027             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1028                     notifierActor, LeaderStateChanged.class);
1029
1030             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1031
1032             notifierActor.underlyingActor().clear();
1033
1034             MockRaftActor raftActor = raftActorRef.underlyingActor();
1035             final String newLeaderId = "new-leader";
1036             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1037                 @Override
1038                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1039                     leaderId = newLeaderId;
1040                     return this;
1041                 }
1042             };
1043
1044             raftActor.changeCurrentBehavior(follower);
1045
1046             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1047             assertEquals(persistenceId, leaderStateChange.getMemberId());
1048             assertEquals(null, leaderStateChange.getLeaderId());
1049
1050             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1051             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1052             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1053
1054             notifierActor.underlyingActor().clear();
1055
1056             raftActor.handleCommand("any");
1057
1058             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1059             assertEquals(persistenceId, leaderStateChange.getMemberId());
1060             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1061         }};
1062     }
1063
1064     @Test
1065     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1066         new JavaTestKit(getSystem()) {{
1067             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1068             MessageCollectorActor.waitUntilReady(notifierActor);
1069
1070             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1071             long heartBeatInterval = 100;
1072             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1073             config.setElectionTimeoutFactor(1);
1074
1075             String persistenceId = factory.generateActorId("notifier-");
1076
1077             factory.createActor(MockRaftActor.props(persistenceId,
1078                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1079
1080             List<RoleChanged> matches =  null;
1081             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1082                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1083                 assertNotNull(matches);
1084                 if(matches.size() == 3) {
1085                     break;
1086                 }
1087                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1088             }
1089
1090             assertEquals(2, matches.size());
1091
1092             // check if the notifier got a role change from null to Follower
1093             RoleChanged raftRoleChanged = matches.get(0);
1094             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1095             assertNull(raftRoleChanged.getOldRole());
1096             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1097
1098             // check if the notifier got a role change from Follower to Candidate
1099             raftRoleChanged = matches.get(1);
1100             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1101             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1102             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1103
1104         }};
1105     }
1106
1107     @Test
1108     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1109         new JavaTestKit(getSystem()) {
1110             {
1111                 String persistenceId = factory.generateActorId("leader-");
1112                 String follower1Id = factory.generateActorId("follower-");
1113
1114                 ActorRef followerActor1 =
1115                         factory.createActor(Props.create(MessageCollectorActor.class));
1116
1117                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1118                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1119                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1120
1121                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1122
1123                 Map<String, String> peerAddresses = new HashMap<>();
1124                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1125
1126                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1127                         MockRaftActor.props(persistenceId, peerAddresses,
1128                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1129
1130                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1131
1132                 leaderActor.getRaftActorContext().setCommitIndex(4);
1133                 leaderActor.getRaftActorContext().setLastApplied(4);
1134                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1135
1136                 leaderActor.waitForInitializeBehaviorComplete();
1137
1138                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1139
1140                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1141                 leaderActor.setCurrentBehavior(leader);
1142                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1143
1144                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1145                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1146
1147                 assertEquals(8, leaderActor.getReplicatedLog().size());
1148
1149                 leaderActor.getRaftActorContext().getSnapshotManager()
1150                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1151                                 new MockRaftActorContext.MockPayload("x")), 4);
1152
1153                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1154
1155                 assertEquals(8, leaderActor.getReplicatedLog().size());
1156
1157                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1158                 //fake snapshot on index 5
1159                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1160
1161                 assertEquals(8, leaderActor.getReplicatedLog().size());
1162
1163                 //fake snapshot on index 6
1164                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1165                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1166                 assertEquals(8, leaderActor.getReplicatedLog().size());
1167
1168                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1169
1170                 assertEquals(8, leaderActor.getReplicatedLog().size());
1171
1172                 ByteString snapshotBytes = fromObject(Arrays.asList(
1173                         new MockRaftActorContext.MockPayload("foo-0"),
1174                         new MockRaftActorContext.MockPayload("foo-1"),
1175                         new MockRaftActorContext.MockPayload("foo-2"),
1176                         new MockRaftActorContext.MockPayload("foo-3"),
1177                         new MockRaftActorContext.MockPayload("foo-4")));
1178
1179                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1180                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1181
1182                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1183
1184                 // The commit is needed to complete the snapshot creation process
1185                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1186
1187                 // capture snapshot reply should remove the snapshotted entries only
1188                 assertEquals(3, leaderActor.getReplicatedLog().size());
1189                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1190
1191                 // add another non-replicated entry
1192                 leaderActor.getReplicatedLog().append(
1193                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1194
1195                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1196                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1197                 assertEquals(2, leaderActor.getReplicatedLog().size());
1198                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1199
1200             }
1201         };
1202     }
1203
1204     @Test
1205     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1206         new JavaTestKit(getSystem()) {
1207             {
1208                 String persistenceId = factory.generateActorId("follower-");
1209                 String leaderId = factory.generateActorId("leader-");
1210
1211
1212                 ActorRef leaderActor1 =
1213                         factory.createActor(Props.create(MessageCollectorActor.class));
1214
1215                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1216                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1217                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1218
1219                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1220
1221                 Map<String, String> peerAddresses = new HashMap<>();
1222                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1223
1224                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1225                         MockRaftActor.props(persistenceId, peerAddresses,
1226                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1227
1228                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1229                 followerActor.getRaftActorContext().setCommitIndex(4);
1230                 followerActor.getRaftActorContext().setLastApplied(4);
1231                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1232
1233                 followerActor.waitForInitializeBehaviorComplete();
1234
1235
1236                 Follower follower = new Follower(followerActor.getRaftActorContext());
1237                 followerActor.setCurrentBehavior(follower);
1238                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1239
1240                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1241                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1242                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1243
1244                 // log has indices 0-5
1245                 assertEquals(6, followerActor.getReplicatedLog().size());
1246
1247                 //snapshot on 4
1248                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1249                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1250                                 new MockRaftActorContext.MockPayload("D")), 4);
1251
1252                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1253
1254                 assertEquals(6, followerActor.getReplicatedLog().size());
1255
1256                 //fake snapshot on index 6
1257                 List<ReplicatedLogEntry> entries =
1258                         Arrays.asList(
1259                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1260                                         new MockRaftActorContext.MockPayload("foo-6"))
1261                         );
1262                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1263                 assertEquals(7, followerActor.getReplicatedLog().size());
1264
1265                 //fake snapshot on index 7
1266                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1267
1268                 entries =
1269                         Arrays.asList(
1270                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1271                                         new MockRaftActorContext.MockPayload("foo-7"))
1272                         );
1273                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1274                 assertEquals(8, followerActor.getReplicatedLog().size());
1275
1276                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1277
1278
1279                 ByteString snapshotBytes = fromObject(Arrays.asList(
1280                         new MockRaftActorContext.MockPayload("foo-0"),
1281                         new MockRaftActorContext.MockPayload("foo-1"),
1282                         new MockRaftActorContext.MockPayload("foo-2"),
1283                         new MockRaftActorContext.MockPayload("foo-3"),
1284                         new MockRaftActorContext.MockPayload("foo-4")));
1285                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1286                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1287
1288                 // The commit is needed to complete the snapshot creation process
1289                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1290
1291                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1292                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1293                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1294
1295                 entries =
1296                         Arrays.asList(
1297                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1298                                         new MockRaftActorContext.MockPayload("foo-7"))
1299                         );
1300                 // send an additional entry 8 with leaderCommit = 7
1301                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1302
1303                 // 7 and 8, as lastapplied is 7
1304                 assertEquals(2, followerActor.getReplicatedLog().size());
1305
1306             }
1307         };
1308     }
1309
1310     @Test
1311     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1312         new JavaTestKit(getSystem()) {
1313             {
1314                 String persistenceId = factory.generateActorId("leader-");
1315                 String follower1Id = factory.generateActorId("follower-");
1316                 String follower2Id = factory.generateActorId("follower-");
1317
1318                 ActorRef followerActor1 =
1319                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1320                 ActorRef followerActor2 =
1321                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1322
1323                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1324                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1325                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1326
1327                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1328
1329                 Map<String, String> peerAddresses = new HashMap<>();
1330                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1331                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1332
1333                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1334                         MockRaftActor.props(persistenceId, peerAddresses,
1335                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1336
1337                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1338                 leaderActor.getRaftActorContext().setCommitIndex(9);
1339                 leaderActor.getRaftActorContext().setLastApplied(9);
1340                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1341
1342                 leaderActor.waitForInitializeBehaviorComplete();
1343
1344                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1345                 leaderActor.setCurrentBehavior(leader);
1346                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1347
1348                 // create 5 entries in the log
1349                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1350                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1351
1352                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1353                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1354                 //setting replicatedToAllIndex = 9, for the log to clear
1355                 leader.setReplicatedToAllIndex(9);
1356                 assertEquals(5, leaderActor.getReplicatedLog().size());
1357                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1358
1359                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1360                 assertEquals(5, leaderActor.getReplicatedLog().size());
1361                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1362
1363                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1364                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1365                 assertEquals(5, leaderActor.getReplicatedLog().size());
1366                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1367
1368                 // simulate a real snapshot
1369                 leaderActor.onReceiveCommand(new SendHeartBeat());
1370                 assertEquals(5, leaderActor.getReplicatedLog().size());
1371                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1372                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1373                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1374
1375
1376                 //reply from a slow follower does not initiate a fake snapshot
1377                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1378                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1379
1380                 ByteString snapshotBytes = fromObject(Arrays.asList(
1381                         new MockRaftActorContext.MockPayload("foo-0"),
1382                         new MockRaftActorContext.MockPayload("foo-1"),
1383                         new MockRaftActorContext.MockPayload("foo-2"),
1384                         new MockRaftActorContext.MockPayload("foo-3"),
1385                         new MockRaftActorContext.MockPayload("foo-4")));
1386                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1387                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1388
1389                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1390
1391                 //reply from a slow follower after should not raise errors
1392                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1393                 assertEquals(0, leaderActor.getReplicatedLog().size());
1394             }
1395         };
1396     }
1397
1398     @Test
1399     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1400         new JavaTestKit(getSystem()) {{
1401             String persistenceId = factory.generateActorId("leader-");
1402             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1403             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1404             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1405             config.setSnapshotBatchCount(5);
1406
1407             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1408
1409             Map<String, String> peerAddresses = new HashMap<>();
1410
1411             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1412                     MockRaftActor.props(persistenceId, peerAddresses,
1413                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1414
1415             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1416             leaderActor.getRaftActorContext().setCommitIndex(3);
1417             leaderActor.getRaftActorContext().setLastApplied(3);
1418             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1419
1420             leaderActor.waitForInitializeBehaviorComplete();
1421             for(int i=0;i< 4;i++) {
1422                 leaderActor.getReplicatedLog()
1423                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1424                                 new MockRaftActorContext.MockPayload("A")));
1425             }
1426
1427             Leader leader = new Leader(leaderActor.getRaftActorContext());
1428             leaderActor.setCurrentBehavior(leader);
1429             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1430
1431             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1432             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1433
1434             // Now send a CaptureSnapshotReply
1435             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1436
1437             // Trimming log in this scenario is a no-op
1438             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1439             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1440             assertEquals(-1, leader.getReplicatedToAllIndex());
1441
1442         }};
1443     }
1444
1445     @Test
1446     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1447         new JavaTestKit(getSystem()) {{
1448             String persistenceId = factory.generateActorId("leader-");
1449             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1450             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1451             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1452             config.setSnapshotBatchCount(5);
1453
1454             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1455
1456             Map<String, String> peerAddresses = new HashMap<>();
1457
1458             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1459                     MockRaftActor.props(persistenceId, peerAddresses,
1460                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1461
1462             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1463             leaderActor.getRaftActorContext().setCommitIndex(3);
1464             leaderActor.getRaftActorContext().setLastApplied(3);
1465             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1466             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1467
1468             leaderActor.waitForInitializeBehaviorComplete();
1469             Leader leader = new Leader(leaderActor.getRaftActorContext());
1470             leaderActor.setCurrentBehavior(leader);
1471             leader.setReplicatedToAllIndex(3);
1472             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1473
1474             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1475             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1476
1477             // Now send a CaptureSnapshotReply
1478             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1479
1480             // Trimming log in this scenario is a no-op
1481             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1482             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1483             assertEquals(3, leader.getReplicatedToAllIndex());
1484
1485         }};
1486     }
1487
1488     private ByteString fromObject(Object snapshot) throws Exception {
1489         ByteArrayOutputStream b = null;
1490         ObjectOutputStream o = null;
1491         try {
1492             b = new ByteArrayOutputStream();
1493             o = new ObjectOutputStream(b);
1494             o.writeObject(snapshot);
1495             byte[] snapshotBytes = b.toByteArray();
1496             return ByteString.copyFrom(snapshotBytes);
1497         } finally {
1498             if (o != null) {
1499                 o.flush();
1500                 o.close();
1501             }
1502             if (b != null) {
1503                 b.close();
1504             }
1505         }
1506     }
1507
1508 }