BUG 2773 : Transition Shard to Leader state when it has no peers
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RoleChanged;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
65 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
67 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
68 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
69 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
76 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
77 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
78 import scala.concurrent.Await;
79 import scala.concurrent.Future;
80 import scala.concurrent.duration.Duration;
81 import scala.concurrent.duration.FiniteDuration;
82
83 public class RaftActorTest extends AbstractActorTest {
84
85     private TestActorFactory factory;
86
87     @Before
88     public void setUp(){
89         factory = new TestActorFactory(getSystem());
90     }
91
92     @After
93     public void tearDown() throws Exception {
94         factory.close();
95         InMemoryJournal.clear();
96         InMemorySnapshotStore.clear();
97     }
98
99     public static class MockRaftActor extends RaftActor {
100
101         protected DataPersistenceProvider dataPersistenceProvider;
102         private final RaftActor delegate;
103         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
104         private final List<Object> state;
105         private ActorRef roleChangeNotifier;
106         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
107
108         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
109             private static final long serialVersionUID = 1L;
110             private final Map<String, String> peerAddresses;
111             private final String id;
112             private final Optional<ConfigParams> config;
113             private final DataPersistenceProvider dataPersistenceProvider;
114             private final ActorRef roleChangeNotifier;
115
116             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
117                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
118                 ActorRef roleChangeNotifier) {
119                 this.peerAddresses = peerAddresses;
120                 this.id = id;
121                 this.config = config;
122                 this.dataPersistenceProvider = dataPersistenceProvider;
123                 this.roleChangeNotifier = roleChangeNotifier;
124             }
125
126             @Override
127             public MockRaftActor create() throws Exception {
128                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
129                     dataPersistenceProvider);
130                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
131                 return mockRaftActor;
132             }
133         }
134
135         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
136                              DataPersistenceProvider dataPersistenceProvider) {
137             super(id, peerAddresses, config);
138             state = new ArrayList<>();
139             this.delegate = mock(RaftActor.class);
140             if(dataPersistenceProvider == null){
141                 this.dataPersistenceProvider = new PersistentDataProvider();
142             } else {
143                 this.dataPersistenceProvider = dataPersistenceProvider;
144             }
145         }
146
147         public void waitForRecoveryComplete() {
148             try {
149                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
150             } catch (InterruptedException e) {
151                 e.printStackTrace();
152             }
153         }
154
155         public void waitForInitializeBehaviorComplete() {
156             try {
157                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
158             } catch (InterruptedException e) {
159                 e.printStackTrace();
160             }
161         }
162
163
164         public void waitUntilLeader(){
165             for(int i = 0;i < 10; i++){
166                 if(isLeader()){
167                     break;
168                 }
169                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
170             }
171         }
172
173         public List<Object> getState() {
174             return state;
175         }
176
177         public static Props props(final String id, final Map<String, String> peerAddresses,
178                 Optional<ConfigParams> config){
179             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
180         }
181
182         public static Props props(final String id, final Map<String, String> peerAddresses,
183                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
184             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
185         }
186
187         public static Props props(final String id, final Map<String, String> peerAddresses,
188             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
189             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
190         }
191
192         public static Props props(final String id, final Map<String, String> peerAddresses,
193                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
194                                   DataPersistenceProvider dataPersistenceProvider){
195             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
196         }
197
198
199         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
200             delegate.applyState(clientActor, identifier, data);
201             LOG.info("{}: applyState called", persistenceId());
202         }
203
204         @Override
205         protected void startLogRecoveryBatch(int maxBatchSize) {
206         }
207
208         @Override
209         protected void appendRecoveredLogEntry(Payload data) {
210             state.add(data);
211         }
212
213         @Override
214         protected void applyCurrentLogRecoveryBatch() {
215         }
216
217         @Override
218         protected void onRecoveryComplete() {
219             delegate.onRecoveryComplete();
220             recoveryComplete.countDown();
221         }
222
223         @Override
224         protected void initializeBehavior() {
225             super.initializeBehavior();
226             initializeBehaviorComplete.countDown();
227         }
228
229         @Override
230         protected void applyRecoverySnapshot(byte[] bytes) {
231             delegate.applyRecoverySnapshot(bytes);
232             try {
233                 Object data = toObject(bytes);
234                 if (data instanceof List) {
235                     state.addAll((List<?>) data);
236                 }
237             } catch (Exception e) {
238                 e.printStackTrace();
239             }
240         }
241
242         @Override protected void createSnapshot() {
243             LOG.info("{}: createSnapshot called", persistenceId());
244             delegate.createSnapshot();
245         }
246
247         @Override protected void applySnapshot(byte [] snapshot) {
248             LOG.info("{}: applySnapshot called", persistenceId());
249             delegate.applySnapshot(snapshot);
250         }
251
252         @Override protected void onStateChanged() {
253             delegate.onStateChanged();
254         }
255
256         @Override
257         protected DataPersistenceProvider persistence() {
258             return this.dataPersistenceProvider;
259         }
260
261         @Override
262         protected Optional<ActorRef> getRoleChangeNotifier() {
263             return Optional.fromNullable(roleChangeNotifier);
264         }
265
266         @Override public String persistenceId() {
267             return this.getId();
268         }
269
270         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
271             Object obj = null;
272             ByteArrayInputStream bis = null;
273             ObjectInputStream ois = null;
274             try {
275                 bis = new ByteArrayInputStream(bs);
276                 ois = new ObjectInputStream(bis);
277                 obj = ois.readObject();
278             } finally {
279                 if (bis != null) {
280                     bis.close();
281                 }
282                 if (ois != null) {
283                     ois.close();
284                 }
285             }
286             return obj;
287         }
288
289         public ReplicatedLog getReplicatedLog(){
290             return this.getRaftActorContext().getReplicatedLog();
291         }
292
293     }
294
295
296     public static class RaftActorTestKit extends JavaTestKit {
297         private final ActorRef raftActor;
298
299         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
300             super(actorSystem);
301
302             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
303                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
304
305         }
306
307
308         public ActorRef getRaftActor() {
309             return raftActor;
310         }
311
312         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
313             // Wait for a specific log message to show up
314             return
315                 new JavaTestKit.EventFilter<Boolean>(logEventClass
316                 ) {
317                     @Override
318                     protected Boolean run() {
319                         return true;
320                     }
321                 }.from(raftActor.path().toString())
322                     .message(message)
323                     .occurrences(1).exec();
324
325
326         }
327
328         protected void waitUntilLeader(){
329             waitUntilLeader(raftActor);
330         }
331
332         public static void waitUntilLeader(ActorRef actorRef) {
333             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
334             for(int i = 0; i < 20 * 5; i++) {
335                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
336                 try {
337                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
338                     if(resp.getLeaderActor() != null) {
339                         return;
340                     }
341                 } catch(TimeoutException e) {
342                 } catch(Exception e) {
343                     System.err.println("FindLeader threw ex");
344                     e.printStackTrace();
345                 }
346
347
348                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
349             }
350
351             Assert.fail("Leader not found for actorRef " + actorRef.path());
352         }
353
354     }
355
356
357     @Test
358     public void testConstruction() {
359         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
360     }
361
362     @Test
363     public void testFindLeaderWhenLeaderIsSelf(){
364         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
365         kit.waitUntilLeader();
366     }
367
368     @Test
369     public void testRaftActorRecovery() throws Exception {
370         new JavaTestKit(getSystem()) {{
371             String persistenceId = factory.generateActorId("follower-");
372
373             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
374             // Set the heartbeat interval high to essentially disable election otherwise the test
375             // may fail if the actor is switched to Leader and the commitIndex is set to the last
376             // log entry.
377             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
378
379             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
380                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
381
382             watch(followerActor);
383
384             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
385             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
386                     new MockRaftActorContext.MockPayload("E"));
387             snapshotUnappliedEntries.add(entry1);
388
389             int lastAppliedDuringSnapshotCapture = 3;
390             int lastIndexDuringSnapshotCapture = 4;
391
392             // 4 messages as part of snapshot, which are applied to state
393             ByteString snapshotBytes = fromObject(Arrays.asList(
394                     new MockRaftActorContext.MockPayload("A"),
395                     new MockRaftActorContext.MockPayload("B"),
396                     new MockRaftActorContext.MockPayload("C"),
397                     new MockRaftActorContext.MockPayload("D")));
398
399             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
400                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
401                     lastAppliedDuringSnapshotCapture, 1);
402             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
403
404             // add more entries after snapshot is taken
405             List<ReplicatedLogEntry> entries = new ArrayList<>();
406             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
407                     new MockRaftActorContext.MockPayload("F"));
408             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
409                     new MockRaftActorContext.MockPayload("G"));
410             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
411                     new MockRaftActorContext.MockPayload("H"));
412             entries.add(entry2);
413             entries.add(entry3);
414             entries.add(entry4);
415
416             int lastAppliedToState = 5;
417             int lastIndex = 7;
418
419             InMemoryJournal.addEntry(persistenceId, 5, entry2);
420             // 2 entries are applied to state besides the 4 entries in snapshot
421             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
422             InMemoryJournal.addEntry(persistenceId, 7, entry3);
423             InMemoryJournal.addEntry(persistenceId, 8, entry4);
424
425             // kill the actor
426             followerActor.tell(PoisonPill.getInstance(), null);
427             expectMsgClass(duration("5 seconds"), Terminated.class);
428
429             unwatch(followerActor);
430
431             //reinstate the actor
432             TestActorRef<MockRaftActor> ref = factory.createTestActor(
433                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
434                             Optional.<ConfigParams>of(config)));
435
436             ref.underlyingActor().waitForRecoveryComplete();
437
438             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
439             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
440                     context.getReplicatedLog().size());
441             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
442             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
443             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
444             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
445         }};
446     }
447
448     @Test
449     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
450         new JavaTestKit(getSystem()) {{
451             String persistenceId = factory.generateActorId("leader-");
452
453             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
454             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
455
456             // Setup the persisted journal with some entries
457             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
458                     new MockRaftActorContext.MockPayload("zero"));
459             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
460                     new MockRaftActorContext.MockPayload("oen"));
461             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
462                     new MockRaftActorContext.MockPayload("two"));
463
464             long seqNr = 1;
465             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
466             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
467             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
468             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
469
470             int lastAppliedToState = 1;
471             int lastIndex = 2;
472
473             //reinstate the actor
474             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
475                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
476                             Optional.<ConfigParams>of(config)));
477
478             leaderActor.underlyingActor().waitForRecoveryComplete();
479
480             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
481             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
482             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
483             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
484             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
485         }};
486     }
487
488     /**
489      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
490      * process recovery messages
491      *
492      * @throws Exception
493      */
494
495     @Test
496     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
497         new JavaTestKit(getSystem()) {
498             {
499                 String persistenceId = factory.generateActorId("leader-");
500
501                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
502
503                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
504
505                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
506                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
507
508                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
509
510                 // Wait for akka's recovery to complete so it doesn't interfere.
511                 mockRaftActor.waitForRecoveryComplete();
512
513                 ByteString snapshotBytes = fromObject(Arrays.asList(
514                         new MockRaftActorContext.MockPayload("A"),
515                         new MockRaftActorContext.MockPayload("B"),
516                         new MockRaftActorContext.MockPayload("C"),
517                         new MockRaftActorContext.MockPayload("D")));
518
519                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
520                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
521
522                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
523
524                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
525
526                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
527
528                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
529
530                 assertEquals("add replicated log entry", 1, replicatedLog.size());
531
532                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
533
534                 assertEquals("add replicated log entry", 2, replicatedLog.size());
535
536                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
537
538                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
539
540                 // The snapshot had 4 items + we added 2 more items during the test
541                 // We start removing from 5 and we should get 1 item in the replicated log
542                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
543
544                 assertEquals("remove log entries", 1, replicatedLog.size());
545
546                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
547
548                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
549                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
550
551                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
552
553             }};
554     }
555
556     /**
557      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
558      * not process recovery messages
559      *
560      * @throws Exception
561      */
562     @Test
563     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
564         new JavaTestKit(getSystem()) {
565             {
566                 String persistenceId = factory.generateActorId("leader-");
567
568                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
569
570                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
571
572                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
573                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
574
575                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
576
577                 // Wait for akka's recovery to complete so it doesn't interfere.
578                 mockRaftActor.waitForRecoveryComplete();
579
580                 ByteString snapshotBytes = fromObject(Arrays.asList(
581                         new MockRaftActorContext.MockPayload("A"),
582                         new MockRaftActorContext.MockPayload("B"),
583                         new MockRaftActorContext.MockPayload("C"),
584                         new MockRaftActorContext.MockPayload("D")));
585
586                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
587                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
588
589                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
590
591                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
592
593                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
594
595                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
596
597                 assertEquals("add replicated log entry", 0, replicatedLog.size());
598
599                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
600
601                 assertEquals("add replicated log entry", 0, replicatedLog.size());
602
603                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
604
605                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
606
607                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
608
609                 assertEquals("remove log entries", 0, replicatedLog.size());
610
611                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
612
613                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
614                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
615
616                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
617             }};
618     }
619
620
621     @Test
622     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
623         new JavaTestKit(getSystem()) {
624             {
625                 String persistenceId = factory.generateActorId("leader-");
626
627                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
628
629                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630
631                 CountDownLatch persistLatch = new CountDownLatch(1);
632                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
633                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
634
635                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
636                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
637
638                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
639
640                 mockRaftActor.waitForInitializeBehaviorComplete();
641
642                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
643
644                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
645             }
646         };
647     }
648
649     @Test
650     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
651         new JavaTestKit(getSystem()) {
652             {
653                 String persistenceId = factory.generateActorId("leader-");
654
655                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
656
657                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
658
659                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
660
661                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
662                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
663
664                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
665
666                 mockRaftActor.waitForInitializeBehaviorComplete();
667
668                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
669
670                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
671
672                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
673             }
674         };
675     }
676
677     @Test
678     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
679         new JavaTestKit(getSystem()) {
680             {
681                 String persistenceId = factory.generateActorId("leader-");
682
683                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
684
685                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
686
687                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
688
689                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
690                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
691
692                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
693
694                 mockRaftActor.waitForInitializeBehaviorComplete();
695
696                 mockRaftActor.waitUntilLeader();
697
698                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
699
700                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
701
702                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
703             }
704         };
705     }
706
707     @Test
708     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
709         new JavaTestKit(getSystem()) {
710             {
711                 String persistenceId = factory.generateActorId("leader-");
712
713                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
714
715                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
716
717                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
718
719                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
720                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
721
722                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
723
724                 mockRaftActor.waitForInitializeBehaviorComplete();
725
726                 mockRaftActor.waitUntilLeader();
727
728                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
729
730                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
731
732             }
733
734         };
735     }
736
737     @Test
738     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
739         new JavaTestKit(getSystem()) {
740             {
741                 String persistenceId = factory.generateActorId("leader-");
742
743                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
744
745                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
746
747                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
748
749                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
750                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
751                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
752
753                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
754
755                 mockRaftActor.waitForInitializeBehaviorComplete();
756
757                 ByteString snapshotBytes = fromObject(Arrays.asList(
758                         new MockRaftActorContext.MockPayload("A"),
759                         new MockRaftActorContext.MockPayload("B"),
760                         new MockRaftActorContext.MockPayload("C"),
761                         new MockRaftActorContext.MockPayload("D")));
762
763                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
764
765                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
766
767                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
768
769                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
770
771                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
772
773             }
774         };
775     }
776
777     @Test
778     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
779         new JavaTestKit(getSystem()) {
780             {
781                 String persistenceId = factory.generateActorId("leader-");
782
783                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
784
785                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
786
787                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
788
789                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
790                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
791
792                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
793
794                 mockRaftActor.waitForInitializeBehaviorComplete();
795
796                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
797                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
798                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
799                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
800                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
801
802                 ByteString snapshotBytes = fromObject(Arrays.asList(
803                         new MockRaftActorContext.MockPayload("A"),
804                         new MockRaftActorContext.MockPayload("B"),
805                         new MockRaftActorContext.MockPayload("C"),
806                         new MockRaftActorContext.MockPayload("D")));
807
808                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
809                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
810
811                 long replicatedToAllIndex = 1;
812                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
813
814                 verify(mockRaftActor.delegate).createSnapshot();
815
816                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
817
818                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
819
820                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
821
822                 verify(dataPersistenceProvider).deleteMessages(100);
823
824                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
825                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
826
827                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
828                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
829                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
830
831                 // Index 2 will not be in the log because it was removed due to snapshotting
832                 assertNull(mockRaftActor.getReplicatedLog().get(1));
833                 assertNull(mockRaftActor.getReplicatedLog().get(0));
834
835             }
836         };
837     }
838
839     @Test
840     public void testApplyState() throws Exception {
841
842         new JavaTestKit(getSystem()) {
843             {
844                 String persistenceId = factory.generateActorId("leader-");
845
846                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
847
848                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
849
850                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
851
852                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
853                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
854
855                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
856
857                 mockRaftActor.waitForInitializeBehaviorComplete();
858
859                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
860                         new MockRaftActorContext.MockPayload("F"));
861
862                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
863
864                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
865
866             }
867         };
868     }
869
870     @Test
871     public void testApplySnapshot() throws Exception {
872         new JavaTestKit(getSystem()) {
873             {
874                 String persistenceId = factory.generateActorId("leader-");
875
876                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
877
878                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
879
880                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
881
882                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
883                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
884
885                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
886
887                 mockRaftActor.waitForInitializeBehaviorComplete();
888
889                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
890
891                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
892                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
893                 oldReplicatedLog.append(
894                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
895                                 mock(Payload.class)));
896
897                 ByteString snapshotBytes = fromObject(Arrays.asList(
898                         new MockRaftActorContext.MockPayload("A"),
899                         new MockRaftActorContext.MockPayload("B"),
900                         new MockRaftActorContext.MockPayload("C"),
901                         new MockRaftActorContext.MockPayload("D")));
902
903                 Snapshot snapshot = mock(Snapshot.class);
904
905                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
906
907                 doReturn(3L).when(snapshot).getLastAppliedIndex();
908
909                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
910
911                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
912
913                 assertTrue("The replicatedLog should have changed",
914                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
915
916                 assertEquals("lastApplied should be same as in the snapshot",
917                         (Long) 3L, mockRaftActor.getLastApplied());
918
919                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
920
921             }
922         };
923     }
924
925     @Test
926     public void testSaveSnapshotFailure() throws Exception {
927         new JavaTestKit(getSystem()) {
928             {
929                 String persistenceId = factory.generateActorId("leader-");
930
931                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
932
933                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
934
935                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
936
937                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
938                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
939
940                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
941
942                 mockRaftActor.waitForInitializeBehaviorComplete();
943
944                 ByteString snapshotBytes = fromObject(Arrays.asList(
945                         new MockRaftActorContext.MockPayload("A"),
946                         new MockRaftActorContext.MockPayload("B"),
947                         new MockRaftActorContext.MockPayload("C"),
948                         new MockRaftActorContext.MockPayload("D")));
949
950                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
951
952                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
953
954                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
955
956                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
957
958                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
959                         new Exception()));
960
961                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
962                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
963
964             }
965         };
966     }
967
968     @Test
969     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
970         new JavaTestKit(getSystem()) {{
971             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
972                     Props.create(MessageCollectorActor.class));
973             MessageCollectorActor.waitUntilReady(notifierActor);
974
975             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
976             long heartBeatInterval = 100;
977             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
978             config.setElectionTimeoutFactor(20);
979
980             String persistenceId = factory.generateActorId("notifier-");
981
982             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
983                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
984                     new NonPersistentProvider()), persistenceId);
985
986             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
987
988
989             // check if the notifier got a role change from null to Follower
990             RoleChanged raftRoleChanged = matches.get(0);
991             assertEquals(persistenceId, raftRoleChanged.getMemberId());
992             assertNull(raftRoleChanged.getOldRole());
993             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
994
995             // check if the notifier got a role change from Follower to Candidate
996             raftRoleChanged = matches.get(1);
997             assertEquals(persistenceId, raftRoleChanged.getMemberId());
998             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
999             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1000
1001             // check if the notifier got a role change from Candidate to Leader
1002             raftRoleChanged = matches.get(2);
1003             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1004             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1005             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1006
1007             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1008                     notifierActor, LeaderStateChanged.class);
1009
1010             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1011
1012             notifierActor.underlyingActor().clear();
1013
1014             MockRaftActor raftActor = raftActorRef.underlyingActor();
1015             final String newLeaderId = "new-leader";
1016             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1017                 @Override
1018                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1019                     leaderId = newLeaderId;
1020                     return this;
1021                 }
1022             };
1023
1024             raftActor.changeCurrentBehavior(follower);
1025
1026             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1027             assertEquals(persistenceId, leaderStateChange.getMemberId());
1028             assertEquals(null, leaderStateChange.getLeaderId());
1029
1030             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1031             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1032             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1033
1034             notifierActor.underlyingActor().clear();
1035
1036             raftActor.handleCommand("any");
1037
1038             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1039             assertEquals(persistenceId, leaderStateChange.getMemberId());
1040             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1041         }};
1042     }
1043
1044     @Test
1045     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1046         new JavaTestKit(getSystem()) {{
1047             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1048             MessageCollectorActor.waitUntilReady(notifierActor);
1049
1050             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1051             long heartBeatInterval = 100;
1052             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1053             config.setElectionTimeoutFactor(1);
1054
1055             String persistenceId = factory.generateActorId("notifier-");
1056
1057             factory.createActor(MockRaftActor.props(persistenceId,
1058                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1059
1060             List<RoleChanged> matches =  null;
1061             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1062                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1063                 assertNotNull(matches);
1064                 if(matches.size() == 3) {
1065                     break;
1066                 }
1067                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1068             }
1069
1070             assertEquals(2, matches.size());
1071
1072             // check if the notifier got a role change from null to Follower
1073             RoleChanged raftRoleChanged = matches.get(0);
1074             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1075             assertNull(raftRoleChanged.getOldRole());
1076             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1077
1078             // check if the notifier got a role change from Follower to Candidate
1079             raftRoleChanged = matches.get(1);
1080             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1081             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1082             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1083
1084         }};
1085     }
1086
1087     @Test
1088     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1089         new JavaTestKit(getSystem()) {
1090             {
1091                 String persistenceId = factory.generateActorId("leader-");
1092                 String follower1Id = factory.generateActorId("follower-");
1093
1094                 ActorRef followerActor1 =
1095                         factory.createActor(Props.create(MessageCollectorActor.class));
1096
1097                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1099                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1100
1101                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1102
1103                 Map<String, String> peerAddresses = new HashMap<>();
1104                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1105
1106                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1107                         MockRaftActor.props(persistenceId, peerAddresses,
1108                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1109
1110                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1111
1112                 leaderActor.getRaftActorContext().setCommitIndex(4);
1113                 leaderActor.getRaftActorContext().setLastApplied(4);
1114                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1115
1116                 leaderActor.waitForInitializeBehaviorComplete();
1117
1118                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1119
1120                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1121                 leaderActor.setCurrentBehavior(leader);
1122                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1123
1124                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1125                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1126
1127                 assertEquals(8, leaderActor.getReplicatedLog().size());
1128
1129                 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1130
1131                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1132                 verify(leaderActor.delegate).createSnapshot();
1133
1134                 assertEquals(8, leaderActor.getReplicatedLog().size());
1135
1136                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1137                 //fake snapshot on index 5
1138                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1139
1140                 assertEquals(8, leaderActor.getReplicatedLog().size());
1141
1142                 //fake snapshot on index 6
1143                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1144                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1145                 assertEquals(8, leaderActor.getReplicatedLog().size());
1146
1147                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1148
1149                 assertEquals(8, leaderActor.getReplicatedLog().size());
1150
1151                 ByteString snapshotBytes = fromObject(Arrays.asList(
1152                         new MockRaftActorContext.MockPayload("foo-0"),
1153                         new MockRaftActorContext.MockPayload("foo-1"),
1154                         new MockRaftActorContext.MockPayload("foo-2"),
1155                         new MockRaftActorContext.MockPayload("foo-3"),
1156                         new MockRaftActorContext.MockPayload("foo-4")));
1157                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1158                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1159
1160                 // capture snapshot reply should remove the snapshotted entries only
1161                 assertEquals(3, leaderActor.getReplicatedLog().size());
1162                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1163
1164                 // add another non-replicated entry
1165                 leaderActor.getReplicatedLog().append(
1166                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1167
1168                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1169                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1170                 assertEquals(2, leaderActor.getReplicatedLog().size());
1171                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1172
1173             }
1174         };
1175     }
1176
1177     @Test
1178     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1179         new JavaTestKit(getSystem()) {
1180             {
1181                 String persistenceId = factory.generateActorId("follower-");
1182                 String leaderId = factory.generateActorId("leader-");
1183
1184
1185                 ActorRef leaderActor1 =
1186                         factory.createActor(Props.create(MessageCollectorActor.class));
1187
1188                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1189                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1190                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1191
1192                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1193
1194                 Map<String, String> peerAddresses = new HashMap<>();
1195                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1196
1197                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1198                         MockRaftActor.props(persistenceId, peerAddresses,
1199                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1200
1201                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1202                 followerActor.getRaftActorContext().setCommitIndex(4);
1203                 followerActor.getRaftActorContext().setLastApplied(4);
1204                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1205
1206                 followerActor.waitForInitializeBehaviorComplete();
1207
1208
1209                 Follower follower = new Follower(followerActor.getRaftActorContext());
1210                 followerActor.setCurrentBehavior(follower);
1211                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1212
1213                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1214                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1215                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1216
1217                 // log has indices 0-5
1218                 assertEquals(6, followerActor.getReplicatedLog().size());
1219
1220                 //snapshot on 4
1221                 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1222
1223                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1224                 verify(followerActor.delegate).createSnapshot();
1225
1226                 assertEquals(6, followerActor.getReplicatedLog().size());
1227
1228                 //fake snapshot on index 6
1229                 List<ReplicatedLogEntry> entries =
1230                         Arrays.asList(
1231                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1232                                         new MockRaftActorContext.MockPayload("foo-6"))
1233                         );
1234                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1235                 assertEquals(7, followerActor.getReplicatedLog().size());
1236
1237                 //fake snapshot on index 7
1238                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1239
1240                 entries =
1241                         Arrays.asList(
1242                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1243                                         new MockRaftActorContext.MockPayload("foo-7"))
1244                         );
1245                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1246                 assertEquals(8, followerActor.getReplicatedLog().size());
1247
1248                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1249
1250
1251                 ByteString snapshotBytes = fromObject(Arrays.asList(
1252                         new MockRaftActorContext.MockPayload("foo-0"),
1253                         new MockRaftActorContext.MockPayload("foo-1"),
1254                         new MockRaftActorContext.MockPayload("foo-2"),
1255                         new MockRaftActorContext.MockPayload("foo-3"),
1256                         new MockRaftActorContext.MockPayload("foo-4")));
1257                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1258                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1259
1260                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1261                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1262                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1263
1264                 entries =
1265                         Arrays.asList(
1266                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1267                                         new MockRaftActorContext.MockPayload("foo-7"))
1268                         );
1269                 // send an additional entry 8 with leaderCommit = 7
1270                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1271
1272                 // 7 and 8, as lastapplied is 7
1273                 assertEquals(2, followerActor.getReplicatedLog().size());
1274
1275             }
1276         };
1277     }
1278
1279     @Test
1280     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1281         new JavaTestKit(getSystem()) {
1282             {
1283                 String persistenceId = factory.generateActorId("leader-");
1284                 String follower1Id = factory.generateActorId("follower-");
1285                 String follower2Id = factory.generateActorId("follower-");
1286
1287                 ActorRef followerActor1 =
1288                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1289                 ActorRef followerActor2 =
1290                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1291
1292                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1293                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1294                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1295
1296                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1297
1298                 Map<String, String> peerAddresses = new HashMap<>();
1299                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1300                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1301
1302                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1303                         MockRaftActor.props(persistenceId, peerAddresses,
1304                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1305
1306                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1307                 leaderActor.getRaftActorContext().setCommitIndex(9);
1308                 leaderActor.getRaftActorContext().setLastApplied(9);
1309                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1310
1311                 leaderActor.waitForInitializeBehaviorComplete();
1312
1313                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1314                 leaderActor.setCurrentBehavior(leader);
1315                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1316
1317                 // create 5 entries in the log
1318                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1319                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1320
1321                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1322                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1323                 //setting replicatedToAllIndex = 9, for the log to clear
1324                 leader.setReplicatedToAllIndex(9);
1325                 assertEquals(5, leaderActor.getReplicatedLog().size());
1326                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1327
1328                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1329                 assertEquals(5, leaderActor.getReplicatedLog().size());
1330                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1331
1332                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1333                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1334                 assertEquals(5, leaderActor.getReplicatedLog().size());
1335                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1336
1337                 // simulate a real snapshot
1338                 leaderActor.onReceiveCommand(new SendHeartBeat());
1339                 assertEquals(5, leaderActor.getReplicatedLog().size());
1340                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1341                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1342                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1343
1344
1345                 //reply from a slow follower does not initiate a fake snapshot
1346                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1347                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1348
1349                 ByteString snapshotBytes = fromObject(Arrays.asList(
1350                         new MockRaftActorContext.MockPayload("foo-0"),
1351                         new MockRaftActorContext.MockPayload("foo-1"),
1352                         new MockRaftActorContext.MockPayload("foo-2"),
1353                         new MockRaftActorContext.MockPayload("foo-3"),
1354                         new MockRaftActorContext.MockPayload("foo-4")));
1355                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1356                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1357
1358                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1359
1360                 //reply from a slow follower after should not raise errors
1361                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1362                 assertEquals(0, leaderActor.getReplicatedLog().size());
1363             }
1364         };
1365     }
1366
1367
1368     private static class NonPersistentProvider implements DataPersistenceProvider {
1369         @Override
1370         public boolean isRecoveryApplicable() {
1371             return false;
1372         }
1373
1374         @Override
1375         public <T> void persist(T o, Procedure<T> procedure) {
1376             try {
1377                 procedure.apply(o);
1378             } catch (Exception e) {
1379                 e.printStackTrace();
1380             }
1381         }
1382
1383         @Override
1384         public void saveSnapshot(Object o) {
1385
1386         }
1387
1388         @Override
1389         public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1390
1391         }
1392
1393         @Override
1394         public void deleteMessages(long sequenceNumber) {
1395
1396         }
1397     }
1398
1399     @Test
1400     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1401         new JavaTestKit(getSystem()) {{
1402             String persistenceId = factory.generateActorId("leader-");
1403             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1404             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1405             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1406             config.setSnapshotBatchCount(5);
1407
1408             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1409
1410             Map<String, String> peerAddresses = new HashMap<>();
1411
1412             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1413                     MockRaftActor.props(persistenceId, peerAddresses,
1414                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1415
1416             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1417             leaderActor.getRaftActorContext().setCommitIndex(3);
1418             leaderActor.getRaftActorContext().setLastApplied(3);
1419             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1420
1421             leaderActor.waitForInitializeBehaviorComplete();
1422             for(int i=0;i< 4;i++) {
1423                 leaderActor.getReplicatedLog()
1424                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1425                                 new MockRaftActorContext.MockPayload("A")));
1426             }
1427
1428             Leader leader = new Leader(leaderActor.getRaftActorContext());
1429             leaderActor.setCurrentBehavior(leader);
1430             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1431
1432             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1433             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1434
1435             // Now send a CaptureSnapshotReply
1436             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1437
1438             // Trimming log in this scenario is a no-op
1439             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1440             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1441             assertEquals(-1, leader.getReplicatedToAllIndex());
1442
1443         }};
1444     }
1445
1446     @Test
1447     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1448         new JavaTestKit(getSystem()) {{
1449             String persistenceId = factory.generateActorId("leader-");
1450             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1451             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1452             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1453             config.setSnapshotBatchCount(5);
1454
1455             DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1456
1457             Map<String, String> peerAddresses = new HashMap<>();
1458
1459             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1460                     MockRaftActor.props(persistenceId, peerAddresses,
1461                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1462
1463             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1464             leaderActor.getRaftActorContext().setCommitIndex(3);
1465             leaderActor.getRaftActorContext().setLastApplied(3);
1466             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1467             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1468
1469             leaderActor.waitForInitializeBehaviorComplete();
1470             Leader leader = new Leader(leaderActor.getRaftActorContext());
1471             leaderActor.setCurrentBehavior(leader);
1472             leader.setReplicatedToAllIndex(3);
1473             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1474
1475             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1476             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1477
1478             // Now send a CaptureSnapshotReply
1479             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1480
1481             // Trimming log in this scenario is a no-op
1482             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1483             assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1484             assertEquals(3, leader.getReplicatedToAllIndex());
1485
1486         }};
1487     }
1488
1489     private ByteString fromObject(Object snapshot) throws Exception {
1490         ByteArrayOutputStream b = null;
1491         ObjectOutputStream o = null;
1492         try {
1493             b = new ByteArrayOutputStream();
1494             o = new ObjectOutputStream(b);
1495             o.writeObject(snapshot);
1496             byte[] snapshotBytes = b.toByteArray();
1497             return ByteString.copyFrom(snapshotBytes);
1498         } finally {
1499             if (o != null) {
1500                 o.flush();
1501                 o.close();
1502             }
1503             if (b != null) {
1504                 b.close();
1505             }
1506         }
1507     }
1508
1509 }