Merge "Export BindingNormalizedNodeSerializer via Config Subsystem."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
58 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RoleChanged;
61 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
66 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
67 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
68 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
69 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
70 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
75 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
76 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
77 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.Duration;
82 import scala.concurrent.duration.FiniteDuration;
83
84 public class RaftActorTest extends AbstractActorTest {
85
86     private TestActorFactory factory;
87
88     @Before
89     public void setUp(){
90         factory = new TestActorFactory(getSystem());
91     }
92
93     @After
94     public void tearDown() throws Exception {
95         factory.close();
96         InMemoryJournal.clear();
97         InMemorySnapshotStore.clear();
98     }
99
100     public static class MockRaftActor extends RaftActor {
101
102         private final RaftActor delegate;
103         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
104         private final List<Object> state;
105         private ActorRef roleChangeNotifier;
106         private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
107
108         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
109             private static final long serialVersionUID = 1L;
110             private final Map<String, String> peerAddresses;
111             private final String id;
112             private final Optional<ConfigParams> config;
113             private final DataPersistenceProvider dataPersistenceProvider;
114             private final ActorRef roleChangeNotifier;
115
116             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
117                 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
118                 ActorRef roleChangeNotifier) {
119                 this.peerAddresses = peerAddresses;
120                 this.id = id;
121                 this.config = config;
122                 this.dataPersistenceProvider = dataPersistenceProvider;
123                 this.roleChangeNotifier = roleChangeNotifier;
124             }
125
126             @Override
127             public MockRaftActor create() throws Exception {
128                 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
129                     dataPersistenceProvider);
130                 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
131                 return mockRaftActor;
132             }
133         }
134
135         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
136                              DataPersistenceProvider dataPersistenceProvider) {
137             super(id, peerAddresses, config);
138             state = new ArrayList<>();
139             this.delegate = mock(RaftActor.class);
140             if(dataPersistenceProvider == null){
141                 setPersistence(true);
142             } else {
143                 setPersistence(dataPersistenceProvider);
144             }
145         }
146
147         public void waitForRecoveryComplete() {
148             try {
149                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
150             } catch (InterruptedException e) {
151                 e.printStackTrace();
152             }
153         }
154
155         public void waitForInitializeBehaviorComplete() {
156             try {
157                 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
158             } catch (InterruptedException e) {
159                 e.printStackTrace();
160             }
161         }
162
163
164         public void waitUntilLeader(){
165             for(int i = 0;i < 10; i++){
166                 if(isLeader()){
167                     break;
168                 }
169                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
170             }
171         }
172
173         public List<Object> getState() {
174             return state;
175         }
176
177         public static Props props(final String id, final Map<String, String> peerAddresses,
178                 Optional<ConfigParams> config){
179             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
180         }
181
182         public static Props props(final String id, final Map<String, String> peerAddresses,
183                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
184             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
185         }
186
187         public static Props props(final String id, final Map<String, String> peerAddresses,
188             Optional<ConfigParams> config, ActorRef roleChangeNotifier){
189             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
190         }
191
192         public static Props props(final String id, final Map<String, String> peerAddresses,
193                                   Optional<ConfigParams> config, ActorRef roleChangeNotifier,
194                                   DataPersistenceProvider dataPersistenceProvider){
195             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
196         }
197
198
199         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
200             delegate.applyState(clientActor, identifier, data);
201             LOG.info("{}: applyState called", persistenceId());
202         }
203
204         @Override
205         protected void startLogRecoveryBatch(int maxBatchSize) {
206         }
207
208         @Override
209         protected void appendRecoveredLogEntry(Payload data) {
210             state.add(data);
211         }
212
213         @Override
214         protected void applyCurrentLogRecoveryBatch() {
215         }
216
217         @Override
218         protected void onRecoveryComplete() {
219             delegate.onRecoveryComplete();
220             recoveryComplete.countDown();
221         }
222
223         @Override
224         protected void initializeBehavior() {
225             super.initializeBehavior();
226             initializeBehaviorComplete.countDown();
227         }
228
229         @Override
230         protected void applyRecoverySnapshot(byte[] bytes) {
231             delegate.applyRecoverySnapshot(bytes);
232             try {
233                 Object data = toObject(bytes);
234                 if (data instanceof List) {
235                     state.addAll((List<?>) data);
236                 }
237             } catch (Exception e) {
238                 e.printStackTrace();
239             }
240         }
241
242         @Override protected void createSnapshot() {
243             LOG.info("{}: createSnapshot called", persistenceId());
244             delegate.createSnapshot();
245         }
246
247         @Override protected void applySnapshot(byte [] snapshot) {
248             LOG.info("{}: applySnapshot called", persistenceId());
249             delegate.applySnapshot(snapshot);
250         }
251
252         @Override protected void onStateChanged() {
253             delegate.onStateChanged();
254         }
255
256         @Override
257         protected Optional<ActorRef> getRoleChangeNotifier() {
258             return Optional.fromNullable(roleChangeNotifier);
259         }
260
261         @Override public String persistenceId() {
262             return this.getId();
263         }
264
265         private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
266             Object obj = null;
267             ByteArrayInputStream bis = null;
268             ObjectInputStream ois = null;
269             try {
270                 bis = new ByteArrayInputStream(bs);
271                 ois = new ObjectInputStream(bis);
272                 obj = ois.readObject();
273             } finally {
274                 if (bis != null) {
275                     bis.close();
276                 }
277                 if (ois != null) {
278                     ois.close();
279                 }
280             }
281             return obj;
282         }
283
284         public ReplicatedLog getReplicatedLog(){
285             return this.getRaftActorContext().getReplicatedLog();
286         }
287
288     }
289
290
291     public static class RaftActorTestKit extends JavaTestKit {
292         private final ActorRef raftActor;
293
294         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
295             super(actorSystem);
296
297             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
298                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
299
300         }
301
302
303         public ActorRef getRaftActor() {
304             return raftActor;
305         }
306
307         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
308             // Wait for a specific log message to show up
309             return
310                 new JavaTestKit.EventFilter<Boolean>(logEventClass
311                 ) {
312                     @Override
313                     protected Boolean run() {
314                         return true;
315                     }
316                 }.from(raftActor.path().toString())
317                     .message(message)
318                     .occurrences(1).exec();
319
320
321         }
322
323         protected void waitUntilLeader(){
324             waitUntilLeader(raftActor);
325         }
326
327         public static void waitUntilLeader(ActorRef actorRef) {
328             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
329             for(int i = 0; i < 20 * 5; i++) {
330                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
331                 try {
332                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
333                     if(resp.getLeaderActor() != null) {
334                         return;
335                     }
336                 } catch(TimeoutException e) {
337                 } catch(Exception e) {
338                     System.err.println("FindLeader threw ex");
339                     e.printStackTrace();
340                 }
341
342
343                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
344             }
345
346             Assert.fail("Leader not found for actorRef " + actorRef.path());
347         }
348
349     }
350
351
352     @Test
353     public void testConstruction() {
354         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
355     }
356
357     @Test
358     public void testFindLeaderWhenLeaderIsSelf(){
359         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
360         kit.waitUntilLeader();
361     }
362
363     @Test
364     public void testRaftActorRecovery() throws Exception {
365         new JavaTestKit(getSystem()) {{
366             String persistenceId = factory.generateActorId("follower-");
367
368             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
369             // Set the heartbeat interval high to essentially disable election otherwise the test
370             // may fail if the actor is switched to Leader and the commitIndex is set to the last
371             // log entry.
372             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
373
374             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
375                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
376
377             watch(followerActor);
378
379             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
380             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
381                     new MockRaftActorContext.MockPayload("E"));
382             snapshotUnappliedEntries.add(entry1);
383
384             int lastAppliedDuringSnapshotCapture = 3;
385             int lastIndexDuringSnapshotCapture = 4;
386
387             // 4 messages as part of snapshot, which are applied to state
388             ByteString snapshotBytes = fromObject(Arrays.asList(
389                     new MockRaftActorContext.MockPayload("A"),
390                     new MockRaftActorContext.MockPayload("B"),
391                     new MockRaftActorContext.MockPayload("C"),
392                     new MockRaftActorContext.MockPayload("D")));
393
394             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
395                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
396                     lastAppliedDuringSnapshotCapture, 1);
397             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
398
399             // add more entries after snapshot is taken
400             List<ReplicatedLogEntry> entries = new ArrayList<>();
401             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
402                     new MockRaftActorContext.MockPayload("F"));
403             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
404                     new MockRaftActorContext.MockPayload("G"));
405             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
406                     new MockRaftActorContext.MockPayload("H"));
407             entries.add(entry2);
408             entries.add(entry3);
409             entries.add(entry4);
410
411             int lastAppliedToState = 5;
412             int lastIndex = 7;
413
414             InMemoryJournal.addEntry(persistenceId, 5, entry2);
415             // 2 entries are applied to state besides the 4 entries in snapshot
416             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
417             InMemoryJournal.addEntry(persistenceId, 7, entry3);
418             InMemoryJournal.addEntry(persistenceId, 8, entry4);
419
420             // kill the actor
421             followerActor.tell(PoisonPill.getInstance(), null);
422             expectMsgClass(duration("5 seconds"), Terminated.class);
423
424             unwatch(followerActor);
425
426             //reinstate the actor
427             TestActorRef<MockRaftActor> ref = factory.createTestActor(
428                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
429                             Optional.<ConfigParams>of(config)));
430
431             ref.underlyingActor().waitForRecoveryComplete();
432
433             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
434             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
435                     context.getReplicatedLog().size());
436             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
437             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
438             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
439             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
440         }};
441     }
442
443     @Test
444     public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
445         new JavaTestKit(getSystem()) {{
446             String persistenceId = factory.generateActorId("leader-");
447
448             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
449             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
450
451             // Setup the persisted journal with some entries
452             ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
453                     new MockRaftActorContext.MockPayload("zero"));
454             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
455                     new MockRaftActorContext.MockPayload("oen"));
456             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
457                     new MockRaftActorContext.MockPayload("two"));
458
459             long seqNr = 1;
460             InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
461             InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
462             InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
463             InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
464
465             int lastAppliedToState = 1;
466             int lastIndex = 2;
467
468             //reinstate the actor
469             TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
470                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
471                             Optional.<ConfigParams>of(config)));
472
473             leaderActor.underlyingActor().waitForRecoveryComplete();
474
475             RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
476             assertEquals("Journal log size", 3, context.getReplicatedLog().size());
477             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
478             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
479             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
480         }};
481     }
482
483     /**
484      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
485      * process recovery messages
486      *
487      * @throws Exception
488      */
489
490     @Test
491     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
492         new JavaTestKit(getSystem()) {
493             {
494                 String persistenceId = factory.generateActorId("leader-");
495
496                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
497
498                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
499
500                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
501                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
502
503                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
504
505                 // Wait for akka's recovery to complete so it doesn't interfere.
506                 mockRaftActor.waitForRecoveryComplete();
507
508                 ByteString snapshotBytes = fromObject(Arrays.asList(
509                         new MockRaftActorContext.MockPayload("A"),
510                         new MockRaftActorContext.MockPayload("B"),
511                         new MockRaftActorContext.MockPayload("C"),
512                         new MockRaftActorContext.MockPayload("D")));
513
514                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
515                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
516
517                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
518
519                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
520
521                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
522
523                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
524
525                 assertEquals("add replicated log entry", 1, replicatedLog.size());
526
527                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
528
529                 assertEquals("add replicated log entry", 2, replicatedLog.size());
530
531                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
532
533                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
534
535                 // The snapshot had 4 items + we added 2 more items during the test
536                 // We start removing from 5 and we should get 1 item in the replicated log
537                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
538
539                 assertEquals("remove log entries", 1, replicatedLog.size());
540
541                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
542
543                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
544                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
545
546                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
547
548             }};
549     }
550
551     /**
552      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
553      * not process recovery messages
554      *
555      * @throws Exception
556      */
557     @Test
558     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
559         new JavaTestKit(getSystem()) {
560             {
561                 String persistenceId = factory.generateActorId("leader-");
562
563                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
564
565                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
566
567                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
568                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
569
570                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
571
572                 // Wait for akka's recovery to complete so it doesn't interfere.
573                 mockRaftActor.waitForRecoveryComplete();
574
575                 ByteString snapshotBytes = fromObject(Arrays.asList(
576                         new MockRaftActorContext.MockPayload("A"),
577                         new MockRaftActorContext.MockPayload("B"),
578                         new MockRaftActorContext.MockPayload("C"),
579                         new MockRaftActorContext.MockPayload("D")));
580
581                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
582                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
583
584                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
585
586                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
587
588                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
589
590                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
591
592                 assertEquals("add replicated log entry", 0, replicatedLog.size());
593
594                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
595
596                 assertEquals("add replicated log entry", 0, replicatedLog.size());
597
598                 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
599
600                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
601
602                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
603
604                 assertEquals("remove log entries", 0, replicatedLog.size());
605
606                 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
607
608                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
609                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
610
611                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
612             }};
613     }
614
615
616     @Test
617     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
618         new JavaTestKit(getSystem()) {
619             {
620                 String persistenceId = factory.generateActorId("leader-");
621
622                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
623
624                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
625
626                 CountDownLatch persistLatch = new CountDownLatch(1);
627                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
628                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
629
630                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
631                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
632
633                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
634
635                 mockRaftActor.waitForInitializeBehaviorComplete();
636
637                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
638
639                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
640             }
641         };
642     }
643
644     @Test
645     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
646         new JavaTestKit(getSystem()) {
647             {
648                 String persistenceId = factory.generateActorId("leader-");
649
650                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
651
652                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
653
654                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
655
656                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
657                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
658
659                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
660
661                 mockRaftActor.waitForInitializeBehaviorComplete();
662
663                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
664
665                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
666
667                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
668             }
669         };
670     }
671
672     @Test
673     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
674         new JavaTestKit(getSystem()) {
675             {
676                 String persistenceId = factory.generateActorId("leader-");
677
678                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
679
680                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
681
682                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
683
684                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
685                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
686
687                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
688
689                 mockRaftActor.waitForInitializeBehaviorComplete();
690
691                 mockRaftActor.waitUntilLeader();
692
693                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
694
695                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
696
697                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
698             }
699         };
700     }
701
702     @Test
703     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
704         new JavaTestKit(getSystem()) {
705             {
706                 String persistenceId = factory.generateActorId("leader-");
707
708                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
709
710                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
711
712                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
713
714                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
715                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
716
717                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
718
719                 mockRaftActor.waitForInitializeBehaviorComplete();
720
721                 mockRaftActor.waitUntilLeader();
722
723                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
724
725                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
726
727             }
728
729         };
730     }
731
732     @Test
733     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
734         new JavaTestKit(getSystem()) {
735             {
736                 String persistenceId = factory.generateActorId("leader-");
737
738                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
739
740                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
741
742                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
743
744                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
745                         MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
746                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
747
748                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
749
750                 mockRaftActor.waitForInitializeBehaviorComplete();
751
752                 ByteString snapshotBytes = fromObject(Arrays.asList(
753                         new MockRaftActorContext.MockPayload("A"),
754                         new MockRaftActorContext.MockPayload("B"),
755                         new MockRaftActorContext.MockPayload("C"),
756                         new MockRaftActorContext.MockPayload("D")));
757
758                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
759
760                 raftActorContext.getSnapshotManager().capture(
761                         new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
762                                 new MockRaftActorContext.MockPayload("D")), -1);
763
764                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
765
766                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
767
768                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
769
770             }
771         };
772     }
773
774     @Test
775     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
776         new JavaTestKit(getSystem()) {
777             {
778                 String persistenceId = factory.generateActorId("leader-");
779
780                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
781
782                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
783
784                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
785
786                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
787                         ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
788
789                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
790
791                 mockRaftActor.waitForInitializeBehaviorComplete();
792                 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
793
794                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
795                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
796                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
797                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
798                 mockRaftActor.getReplicatedLog().append(lastEntry);
799
800                 ByteString snapshotBytes = fromObject(Arrays.asList(
801                         new MockRaftActorContext.MockPayload("A"),
802                         new MockRaftActorContext.MockPayload("B"),
803                         new MockRaftActorContext.MockPayload("C"),
804                         new MockRaftActorContext.MockPayload("D")));
805
806                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
807                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
808
809                 long replicatedToAllIndex = 1;
810
811                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
812
813                 verify(mockRaftActor.delegate).createSnapshot();
814
815                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
816
817                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
818
819                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
820
821                 verify(dataPersistenceProvider).deleteMessages(100);
822
823                 assertEquals(3, mockRaftActor.getReplicatedLog().size());
824                 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
825
826                 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
827                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
828                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
829
830                 // Index 2 will not be in the log because it was removed due to snapshotting
831                 assertNull(mockRaftActor.getReplicatedLog().get(1));
832                 assertNull(mockRaftActor.getReplicatedLog().get(0));
833
834             }
835         };
836     }
837
838     @Test
839     public void testApplyState() throws Exception {
840
841         new JavaTestKit(getSystem()) {
842             {
843                 String persistenceId = factory.generateActorId("leader-");
844
845                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
846
847                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
848
849                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
850
851                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
852                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
853
854                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
855
856                 mockRaftActor.waitForInitializeBehaviorComplete();
857
858                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
859                         new MockRaftActorContext.MockPayload("F"));
860
861                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
862
863                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
864
865             }
866         };
867     }
868
869     @Test
870     public void testApplySnapshot() throws Exception {
871         new JavaTestKit(getSystem()) {
872             {
873                 String persistenceId = factory.generateActorId("leader-");
874
875                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
876
877                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
878
879                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
880
881                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
882                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
883
884                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
885
886                 mockRaftActor.waitForInitializeBehaviorComplete();
887
888                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
889
890                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
891                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
892                 oldReplicatedLog.append(
893                         new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
894                                 mock(Payload.class)));
895
896                 ByteString snapshotBytes = fromObject(Arrays.asList(
897                         new MockRaftActorContext.MockPayload("A"),
898                         new MockRaftActorContext.MockPayload("B"),
899                         new MockRaftActorContext.MockPayload("C"),
900                         new MockRaftActorContext.MockPayload("D")));
901
902                 Snapshot snapshot = mock(Snapshot.class);
903
904                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
905
906                 doReturn(3L).when(snapshot).getLastAppliedIndex();
907
908                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
909
910                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
911
912                 assertTrue("The replicatedLog should have changed",
913                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
914
915                 assertEquals("lastApplied should be same as in the snapshot",
916                         (Long) 3L, mockRaftActor.getLastApplied());
917
918                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
919
920             }
921         };
922     }
923
924     @Test
925     public void testSaveSnapshotFailure() throws Exception {
926         new JavaTestKit(getSystem()) {
927             {
928                 String persistenceId = factory.generateActorId("leader-");
929
930                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
931
932                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
933
934                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
935
936                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
937                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
938
939                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
940
941                 mockRaftActor.waitForInitializeBehaviorComplete();
942
943                 ByteString snapshotBytes = fromObject(Arrays.asList(
944                         new MockRaftActorContext.MockPayload("A"),
945                         new MockRaftActorContext.MockPayload("B"),
946                         new MockRaftActorContext.MockPayload("C"),
947                         new MockRaftActorContext.MockPayload("D")));
948
949                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
950
951                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
952
953                 raftActorContext.getSnapshotManager().capture(
954                         new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
955                                 new MockRaftActorContext.MockPayload("D")), 1);
956
957                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
958
959                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
960                         new Exception()));
961
962                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
963                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
964
965             }
966         };
967     }
968
969     @Test
970     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
971         new JavaTestKit(getSystem()) {{
972             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
973                     Props.create(MessageCollectorActor.class));
974             MessageCollectorActor.waitUntilReady(notifierActor);
975
976             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
977             long heartBeatInterval = 100;
978             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
979             config.setElectionTimeoutFactor(20);
980
981             String persistenceId = factory.generateActorId("notifier-");
982
983             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
984                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
985                     new NonPersistentDataProvider()), persistenceId);
986
987             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
988
989
990             // check if the notifier got a role change from null to Follower
991             RoleChanged raftRoleChanged = matches.get(0);
992             assertEquals(persistenceId, raftRoleChanged.getMemberId());
993             assertNull(raftRoleChanged.getOldRole());
994             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
995
996             // check if the notifier got a role change from Follower to Candidate
997             raftRoleChanged = matches.get(1);
998             assertEquals(persistenceId, raftRoleChanged.getMemberId());
999             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1000             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1001
1002             // check if the notifier got a role change from Candidate to Leader
1003             raftRoleChanged = matches.get(2);
1004             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1005             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1006             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1007
1008             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1009                     notifierActor, LeaderStateChanged.class);
1010
1011             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1012
1013             notifierActor.underlyingActor().clear();
1014
1015             MockRaftActor raftActor = raftActorRef.underlyingActor();
1016             final String newLeaderId = "new-leader";
1017             Follower follower = new Follower(raftActor.getRaftActorContext()) {
1018                 @Override
1019                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1020                     leaderId = newLeaderId;
1021                     return this;
1022                 }
1023             };
1024
1025             raftActor.changeCurrentBehavior(follower);
1026
1027             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1028             assertEquals(persistenceId, leaderStateChange.getMemberId());
1029             assertEquals(null, leaderStateChange.getLeaderId());
1030
1031             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1032             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1033             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1034
1035             notifierActor.underlyingActor().clear();
1036
1037             raftActor.handleCommand("any");
1038
1039             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1040             assertEquals(persistenceId, leaderStateChange.getMemberId());
1041             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1042         }};
1043     }
1044
1045     @Test
1046     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1047         new JavaTestKit(getSystem()) {{
1048             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1049             MessageCollectorActor.waitUntilReady(notifierActor);
1050
1051             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1052             long heartBeatInterval = 100;
1053             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1054             config.setElectionTimeoutFactor(1);
1055
1056             String persistenceId = factory.generateActorId("notifier-");
1057
1058             factory.createActor(MockRaftActor.props(persistenceId,
1059                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1060
1061             List<RoleChanged> matches =  null;
1062             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1063                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1064                 assertNotNull(matches);
1065                 if(matches.size() == 3) {
1066                     break;
1067                 }
1068                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1069             }
1070
1071             assertEquals(2, matches.size());
1072
1073             // check if the notifier got a role change from null to Follower
1074             RoleChanged raftRoleChanged = matches.get(0);
1075             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1076             assertNull(raftRoleChanged.getOldRole());
1077             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1078
1079             // check if the notifier got a role change from Follower to Candidate
1080             raftRoleChanged = matches.get(1);
1081             assertEquals(persistenceId, raftRoleChanged.getMemberId());
1082             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1083             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1084
1085         }};
1086     }
1087
1088     @Test
1089     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1090         new JavaTestKit(getSystem()) {
1091             {
1092                 String persistenceId = factory.generateActorId("leader-");
1093                 String follower1Id = factory.generateActorId("follower-");
1094
1095                 ActorRef followerActor1 =
1096                         factory.createActor(Props.create(MessageCollectorActor.class));
1097
1098                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1099                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1100                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1101
1102                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1103
1104                 Map<String, String> peerAddresses = new HashMap<>();
1105                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1106
1107                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1108                         MockRaftActor.props(persistenceId, peerAddresses,
1109                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1110
1111                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1112
1113                 leaderActor.getRaftActorContext().setCommitIndex(4);
1114                 leaderActor.getRaftActorContext().setLastApplied(4);
1115                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1116
1117                 leaderActor.waitForInitializeBehaviorComplete();
1118
1119                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1120
1121                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1122                 leaderActor.setCurrentBehavior(leader);
1123                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1124
1125                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1126                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1127
1128                 assertEquals(8, leaderActor.getReplicatedLog().size());
1129
1130                 leaderActor.getRaftActorContext().getSnapshotManager()
1131                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1132                                 new MockRaftActorContext.MockPayload("x")), 4);
1133
1134                 verify(leaderActor.delegate).createSnapshot();
1135
1136                 assertEquals(8, leaderActor.getReplicatedLog().size());
1137
1138                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1139                 //fake snapshot on index 5
1140                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1141
1142                 assertEquals(8, leaderActor.getReplicatedLog().size());
1143
1144                 //fake snapshot on index 6
1145                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1146                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1147                 assertEquals(8, leaderActor.getReplicatedLog().size());
1148
1149                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1150
1151                 assertEquals(8, leaderActor.getReplicatedLog().size());
1152
1153                 ByteString snapshotBytes = fromObject(Arrays.asList(
1154                         new MockRaftActorContext.MockPayload("foo-0"),
1155                         new MockRaftActorContext.MockPayload("foo-1"),
1156                         new MockRaftActorContext.MockPayload("foo-2"),
1157                         new MockRaftActorContext.MockPayload("foo-3"),
1158                         new MockRaftActorContext.MockPayload("foo-4")));
1159
1160                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1161                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1162
1163                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1164
1165                 // The commit is needed to complete the snapshot creation process
1166                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1167
1168                 // capture snapshot reply should remove the snapshotted entries only
1169                 assertEquals(3, leaderActor.getReplicatedLog().size());
1170                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1171
1172                 // add another non-replicated entry
1173                 leaderActor.getReplicatedLog().append(
1174                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1175
1176                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1177                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1178                 assertEquals(2, leaderActor.getReplicatedLog().size());
1179                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1180
1181             }
1182         };
1183     }
1184
1185     @Test
1186     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1187         new JavaTestKit(getSystem()) {
1188             {
1189                 String persistenceId = factory.generateActorId("follower-");
1190                 String leaderId = factory.generateActorId("leader-");
1191
1192
1193                 ActorRef leaderActor1 =
1194                         factory.createActor(Props.create(MessageCollectorActor.class));
1195
1196                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1197                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1198                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1199
1200                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1201
1202                 Map<String, String> peerAddresses = new HashMap<>();
1203                 peerAddresses.put(leaderId, leaderActor1.path().toString());
1204
1205                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1206                         MockRaftActor.props(persistenceId, peerAddresses,
1207                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1208
1209                 MockRaftActor followerActor = mockActorRef.underlyingActor();
1210                 followerActor.getRaftActorContext().setCommitIndex(4);
1211                 followerActor.getRaftActorContext().setLastApplied(4);
1212                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1213
1214                 followerActor.waitForInitializeBehaviorComplete();
1215
1216
1217                 Follower follower = new Follower(followerActor.getRaftActorContext());
1218                 followerActor.setCurrentBehavior(follower);
1219                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1220
1221                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1222                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1223                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1224
1225                 // log has indices 0-5
1226                 assertEquals(6, followerActor.getReplicatedLog().size());
1227
1228                 //snapshot on 4
1229                 followerActor.getRaftActorContext().getSnapshotManager().capture(
1230                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1231                                 new MockRaftActorContext.MockPayload("D")), 4);
1232
1233                 verify(followerActor.delegate).createSnapshot();
1234
1235                 assertEquals(6, followerActor.getReplicatedLog().size());
1236
1237                 //fake snapshot on index 6
1238                 List<ReplicatedLogEntry> entries =
1239                         Arrays.asList(
1240                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1241                                         new MockRaftActorContext.MockPayload("foo-6"))
1242                         );
1243                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1244                 assertEquals(7, followerActor.getReplicatedLog().size());
1245
1246                 //fake snapshot on index 7
1247                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1248
1249                 entries =
1250                         Arrays.asList(
1251                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1252                                         new MockRaftActorContext.MockPayload("foo-7"))
1253                         );
1254                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1255                 assertEquals(8, followerActor.getReplicatedLog().size());
1256
1257                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1258
1259
1260                 ByteString snapshotBytes = fromObject(Arrays.asList(
1261                         new MockRaftActorContext.MockPayload("foo-0"),
1262                         new MockRaftActorContext.MockPayload("foo-1"),
1263                         new MockRaftActorContext.MockPayload("foo-2"),
1264                         new MockRaftActorContext.MockPayload("foo-3"),
1265                         new MockRaftActorContext.MockPayload("foo-4")));
1266                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1267                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1268
1269                 // The commit is needed to complete the snapshot creation process
1270                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1271
1272                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1273                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1274                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1275
1276                 entries =
1277                         Arrays.asList(
1278                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1279                                         new MockRaftActorContext.MockPayload("foo-7"))
1280                         );
1281                 // send an additional entry 8 with leaderCommit = 7
1282                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1283
1284                 // 7 and 8, as lastapplied is 7
1285                 assertEquals(2, followerActor.getReplicatedLog().size());
1286
1287             }
1288         };
1289     }
1290
1291     @Test
1292     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1293         new JavaTestKit(getSystem()) {
1294             {
1295                 String persistenceId = factory.generateActorId("leader-");
1296                 String follower1Id = factory.generateActorId("follower-");
1297                 String follower2Id = factory.generateActorId("follower-");
1298
1299                 ActorRef followerActor1 =
1300                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1301                 ActorRef followerActor2 =
1302                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1303
1304                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1305                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1306                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1307
1308                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1309
1310                 Map<String, String> peerAddresses = new HashMap<>();
1311                 peerAddresses.put(follower1Id, followerActor1.path().toString());
1312                 peerAddresses.put(follower2Id, followerActor2.path().toString());
1313
1314                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1315                         MockRaftActor.props(persistenceId, peerAddresses,
1316                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1317
1318                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1319                 leaderActor.getRaftActorContext().setCommitIndex(9);
1320                 leaderActor.getRaftActorContext().setLastApplied(9);
1321                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1322
1323                 leaderActor.waitForInitializeBehaviorComplete();
1324
1325                 Leader leader = new Leader(leaderActor.getRaftActorContext());
1326                 leaderActor.setCurrentBehavior(leader);
1327                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1328
1329                 // create 5 entries in the log
1330                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1331                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1332
1333                 //set the snapshot index to 4 , 0 to 4 are snapshotted
1334                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1335                 //setting replicatedToAllIndex = 9, for the log to clear
1336                 leader.setReplicatedToAllIndex(9);
1337                 assertEquals(5, leaderActor.getReplicatedLog().size());
1338                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1339
1340                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1341                 assertEquals(5, leaderActor.getReplicatedLog().size());
1342                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1343
1344                 // set the 2nd follower nextIndex to 1 which has been snapshotted
1345                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1346                 assertEquals(5, leaderActor.getReplicatedLog().size());
1347                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1348
1349                 // simulate a real snapshot
1350                 leaderActor.onReceiveCommand(new SendHeartBeat());
1351                 assertEquals(5, leaderActor.getReplicatedLog().size());
1352                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1353                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1354                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1355
1356
1357                 //reply from a slow follower does not initiate a fake snapshot
1358                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1359                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1360
1361                 ByteString snapshotBytes = fromObject(Arrays.asList(
1362                         new MockRaftActorContext.MockPayload("foo-0"),
1363                         new MockRaftActorContext.MockPayload("foo-1"),
1364                         new MockRaftActorContext.MockPayload("foo-2"),
1365                         new MockRaftActorContext.MockPayload("foo-3"),
1366                         new MockRaftActorContext.MockPayload("foo-4")));
1367                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1368                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1369
1370                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1371
1372                 //reply from a slow follower after should not raise errors
1373                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1374                 assertEquals(0, leaderActor.getReplicatedLog().size());
1375             }
1376         };
1377     }
1378
1379     @Test
1380     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1381         new JavaTestKit(getSystem()) {{
1382             String persistenceId = factory.generateActorId("leader-");
1383             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1384             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1386             config.setSnapshotBatchCount(5);
1387
1388             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1389
1390             Map<String, String> peerAddresses = new HashMap<>();
1391
1392             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1393                     MockRaftActor.props(persistenceId, peerAddresses,
1394                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1395
1396             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1397             leaderActor.getRaftActorContext().setCommitIndex(3);
1398             leaderActor.getRaftActorContext().setLastApplied(3);
1399             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1400
1401             leaderActor.waitForInitializeBehaviorComplete();
1402             for(int i=0;i< 4;i++) {
1403                 leaderActor.getReplicatedLog()
1404                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1405                                 new MockRaftActorContext.MockPayload("A")));
1406             }
1407
1408             Leader leader = new Leader(leaderActor.getRaftActorContext());
1409             leaderActor.setCurrentBehavior(leader);
1410             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1411
1412             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1413             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1414
1415             // Now send a CaptureSnapshotReply
1416             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1417
1418             // Trimming log in this scenario is a no-op
1419             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1420             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1421             assertEquals(-1, leader.getReplicatedToAllIndex());
1422
1423         }};
1424     }
1425
1426     @Test
1427     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1428         new JavaTestKit(getSystem()) {{
1429             String persistenceId = factory.generateActorId("leader-");
1430             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1431             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1432             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1433             config.setSnapshotBatchCount(5);
1434
1435             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1436
1437             Map<String, String> peerAddresses = new HashMap<>();
1438
1439             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1440                     MockRaftActor.props(persistenceId, peerAddresses,
1441                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1442
1443             MockRaftActor leaderActor = mockActorRef.underlyingActor();
1444             leaderActor.getRaftActorContext().setCommitIndex(3);
1445             leaderActor.getRaftActorContext().setLastApplied(3);
1446             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1447             leaderActor.getReplicatedLog().setSnapshotIndex(3);
1448
1449             leaderActor.waitForInitializeBehaviorComplete();
1450             Leader leader = new Leader(leaderActor.getRaftActorContext());
1451             leaderActor.setCurrentBehavior(leader);
1452             leader.setReplicatedToAllIndex(3);
1453             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1454
1455             // Persist another entry (this will cause a CaptureSnapshot to be triggered
1456             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1457
1458             // Now send a CaptureSnapshotReply
1459             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1460
1461             // Trimming log in this scenario is a no-op
1462             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1463             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1464             assertEquals(3, leader.getReplicatedToAllIndex());
1465
1466         }};
1467     }
1468
1469     private ByteString fromObject(Object snapshot) throws Exception {
1470         ByteArrayOutputStream b = null;
1471         ObjectOutputStream o = null;
1472         try {
1473             b = new ByteArrayOutputStream();
1474             o = new ObjectOutputStream(b);
1475             o.writeObject(snapshot);
1476             byte[] snapshotBytes = b.toByteArray();
1477             return ByteString.copyFrom(snapshotBytes);
1478         } finally {
1479             if (o != null) {
1480                 o.flush();
1481                 o.close();
1482             }
1483             if (b != null) {
1484                 b.close();
1485             }
1486         }
1487     }
1488
1489 }