Merge "Updating features archetype to talk about user-facing features."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
35 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
36 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
37 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
38 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
39 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
40 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
45
46 import java.io.ByteArrayInputStream;
47 import java.io.ByteArrayOutputStream;
48 import java.io.IOException;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.Collections;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59
60 import static org.junit.Assert.assertEquals;
61 import static org.junit.Assert.assertNotEquals;
62 import static org.junit.Assert.assertNotNull;
63 import static org.junit.Assert.assertNull;
64 import static org.junit.Assert.assertTrue;
65 import static org.mockito.Matchers.any;
66 import static org.mockito.Matchers.anyObject;
67 import static org.mockito.Matchers.eq;
68 import static org.mockito.Mockito.doReturn;
69 import static org.mockito.Mockito.mock;
70 import static org.mockito.Mockito.times;
71 import static org.mockito.Mockito.verify;
72
73 public class RaftActorTest extends AbstractActorTest {
74
75
76     @After
77     public void tearDown() {
78         MockAkkaJournal.clearJournal();
79         MockSnapshotStore.setMockSnapshot(null);
80     }
81
82     public static class MockRaftActor extends RaftActor {
83
84         private final DataPersistenceProvider dataPersistenceProvider;
85         private final RaftActor delegate;
86
87         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
88             private static final long serialVersionUID = 1L;
89             private final Map<String, String> peerAddresses;
90             private final String id;
91             private final Optional<ConfigParams> config;
92             private final DataPersistenceProvider dataPersistenceProvider;
93
94             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
95                     Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
96                 this.peerAddresses = peerAddresses;
97                 this.id = id;
98                 this.config = config;
99                 this.dataPersistenceProvider = dataPersistenceProvider;
100             }
101
102             @Override
103             public MockRaftActor create() throws Exception {
104                 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
105             }
106         }
107
108         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
109
110         private final List<Object> state;
111
112         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
113             super(id, peerAddresses, config);
114             state = new ArrayList<>();
115             this.delegate = mock(RaftActor.class);
116             if(dataPersistenceProvider == null){
117                 this.dataPersistenceProvider = new PersistentDataProvider();
118             } else {
119                 this.dataPersistenceProvider = dataPersistenceProvider;
120             }
121         }
122
123         public void waitForRecoveryComplete() {
124             try {
125                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
126             } catch (InterruptedException e) {
127                 e.printStackTrace();
128             }
129         }
130
131         public List<Object> getState() {
132             return state;
133         }
134
135         public static Props props(final String id, final Map<String, String> peerAddresses,
136                 Optional<ConfigParams> config){
137             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
138         }
139
140         public static Props props(final String id, final Map<String, String> peerAddresses,
141                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
142             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
143         }
144
145
146         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
147             delegate.applyState(clientActor, identifier, data);
148             LOG.info("applyState called");
149         }
150
151
152
153
154         @Override
155         protected void startLogRecoveryBatch(int maxBatchSize) {
156         }
157
158         @Override
159         protected void appendRecoveredLogEntry(Payload data) {
160             state.add(data);
161         }
162
163         @Override
164         protected void applyCurrentLogRecoveryBatch() {
165         }
166
167         @Override
168         protected void onRecoveryComplete() {
169             delegate.onRecoveryComplete();
170             recoveryComplete.countDown();
171         }
172
173         @Override
174         protected void applyRecoverySnapshot(ByteString snapshot) {
175             delegate.applyRecoverySnapshot(snapshot);
176             try {
177                 Object data = toObject(snapshot);
178                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
179                 if (data instanceof List) {
180                     state.addAll((List<?>) data);
181                 }
182             } catch (Exception e) {
183                 e.printStackTrace();
184             }
185         }
186
187         @Override protected void createSnapshot() {
188             delegate.createSnapshot();
189         }
190
191         @Override protected void applySnapshot(ByteString snapshot) {
192             delegate.applySnapshot(snapshot);
193         }
194
195         @Override protected void onStateChanged() {
196             delegate.onStateChanged();
197         }
198
199         @Override
200         protected DataPersistenceProvider persistence() {
201             return this.dataPersistenceProvider;
202         }
203
204         @Override public String persistenceId() {
205             return this.getId();
206         }
207
208         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
209             Object obj = null;
210             ByteArrayInputStream bis = null;
211             ObjectInputStream ois = null;
212             try {
213                 bis = new ByteArrayInputStream(bs.toByteArray());
214                 ois = new ObjectInputStream(bis);
215                 obj = ois.readObject();
216             } finally {
217                 if (bis != null) {
218                     bis.close();
219                 }
220                 if (ois != null) {
221                     ois.close();
222                 }
223             }
224             return obj;
225         }
226
227         public ReplicatedLog getReplicatedLog(){
228             return this.getRaftActorContext().getReplicatedLog();
229         }
230
231     }
232
233
234     private static class RaftActorTestKit extends JavaTestKit {
235         private final ActorRef raftActor;
236
237         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
238             super(actorSystem);
239
240             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
241                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
242
243         }
244
245
246         public ActorRef getRaftActor() {
247             return raftActor;
248         }
249
250         public boolean waitForLogMessage(final Class<?> logEventClass, String message){
251             // Wait for a specific log message to show up
252             return
253                 new JavaTestKit.EventFilter<Boolean>(logEventClass
254                 ) {
255                     @Override
256                     protected Boolean run() {
257                         return true;
258                     }
259                 }.from(raftActor.path().toString())
260                     .message(message)
261                     .occurrences(1).exec();
262
263
264         }
265
266         protected void waitUntilLeader(){
267             waitUntilLeader(raftActor);
268         }
269
270         protected void waitUntilLeader(ActorRef actorRef) {
271             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
272             for(int i = 0; i < 20 * 5; i++) {
273                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
274                 try {
275                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
276                     if(resp.getLeaderActor() != null) {
277                         return;
278                     }
279                 } catch(TimeoutException e) {
280                 } catch(Exception e) {
281                     System.err.println("FindLeader threw ex");
282                     e.printStackTrace();
283                 }
284
285
286                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
287             }
288
289             Assert.fail("Leader not found for actorRef " + actorRef.path());
290         }
291
292     }
293
294
295     @Test
296     public void testConstruction() {
297         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
298     }
299
300     @Test
301     public void testFindLeaderWhenLeaderIsSelf(){
302         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
303         kit.waitUntilLeader();
304     }
305
306     @Test
307     public void testRaftActorRecovery() throws Exception {
308         new JavaTestKit(getSystem()) {{
309             String persistenceId = "follower10";
310
311             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
312             // Set the heartbeat interval high to essentially disable election otherwise the test
313             // may fail if the actor is switched to Leader and the commitIndex is set to the last
314             // log entry.
315             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
316
317             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
318                     Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
319
320             watch(followerActor);
321
322             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
323             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
324                     new MockRaftActorContext.MockPayload("E"));
325             snapshotUnappliedEntries.add(entry1);
326
327             int lastAppliedDuringSnapshotCapture = 3;
328             int lastIndexDuringSnapshotCapture = 4;
329
330                 // 4 messages as part of snapshot, which are applied to state
331             ByteString snapshotBytes  = fromObject(Arrays.asList(
332                         new MockRaftActorContext.MockPayload("A"),
333                         new MockRaftActorContext.MockPayload("B"),
334                         new MockRaftActorContext.MockPayload("C"),
335                         new MockRaftActorContext.MockPayload("D")));
336
337             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
338                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
339                     lastAppliedDuringSnapshotCapture, 1);
340             MockSnapshotStore.setMockSnapshot(snapshot);
341             MockSnapshotStore.setPersistenceId(persistenceId);
342
343             // add more entries after snapshot is taken
344             List<ReplicatedLogEntry> entries = new ArrayList<>();
345             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
346                     new MockRaftActorContext.MockPayload("F"));
347             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
348                     new MockRaftActorContext.MockPayload("G"));
349             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
350                     new MockRaftActorContext.MockPayload("H"));
351             entries.add(entry2);
352             entries.add(entry3);
353             entries.add(entry4);
354
355             int lastAppliedToState = 5;
356             int lastIndex = 7;
357
358             MockAkkaJournal.addToJournal(5, entry2);
359             // 2 entries are applied to state besides the 4 entries in snapshot
360             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
361             MockAkkaJournal.addToJournal(7, entry3);
362             MockAkkaJournal.addToJournal(8, entry4);
363
364             // kill the actor
365             followerActor.tell(PoisonPill.getInstance(), null);
366             expectMsgClass(duration("5 seconds"), Terminated.class);
367
368             unwatch(followerActor);
369
370             //reinstate the actor
371             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
372                     MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
373                             Optional.<ConfigParams>of(config)));
374
375             ref.underlyingActor().waitForRecoveryComplete();
376
377             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
378             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
379                     context.getReplicatedLog().size());
380             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
381             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
382             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
383             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
384         }};
385     }
386
387     /**
388      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
389      * process recovery messages
390      *
391      * @throws Exception
392      */
393
394     @Test
395     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
396         new JavaTestKit(getSystem()) {
397             {
398                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
399
400                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
401
402                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
403
404                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
405                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
406
407                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
408
409                 // Wait for akka's recovery to complete so it doesn't interfere.
410                 mockRaftActor.waitForRecoveryComplete();
411
412                 ByteString snapshotBytes  = fromObject(Arrays.asList(
413                         new MockRaftActorContext.MockPayload("A"),
414                         new MockRaftActorContext.MockPayload("B"),
415                         new MockRaftActorContext.MockPayload("C"),
416                         new MockRaftActorContext.MockPayload("D")));
417
418                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
419                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
420
421                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
422
423                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
424
425                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
426
427                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
428
429                 assertEquals("add replicated log entry", 1, replicatedLog.size());
430
431                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
432
433                 assertEquals("add replicated log entry", 2, replicatedLog.size());
434
435                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
436
437                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
438
439                 // The snapshot had 4 items + we added 2 more items during the test
440                 // We start removing from 5 and we should get 1 item in the replicated log
441                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
442
443                 assertEquals("remove log entries", 1, replicatedLog.size());
444
445                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
446
447                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
448                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
449
450                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
451
452                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
453
454             }};
455     }
456
457     /**
458      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
459      * not process recovery messages
460      *
461      * @throws Exception
462      */
463     @Test
464     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
465         new JavaTestKit(getSystem()) {
466             {
467                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
468
469                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
470
471                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
472
473                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
474                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
475
476                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
477
478                 // Wait for akka's recovery to complete so it doesn't interfere.
479                 mockRaftActor.waitForRecoveryComplete();
480
481                 ByteString snapshotBytes  = fromObject(Arrays.asList(
482                         new MockRaftActorContext.MockPayload("A"),
483                         new MockRaftActorContext.MockPayload("B"),
484                         new MockRaftActorContext.MockPayload("C"),
485                         new MockRaftActorContext.MockPayload("D")));
486
487                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
488                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
489
490                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
491
492                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
493
494                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
495
496                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
497
498                 assertEquals("add replicated log entry", 0, replicatedLog.size());
499
500                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
501
502                 assertEquals("add replicated log entry", 0, replicatedLog.size());
503
504                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
505
506                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
507
508                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
509
510                 assertEquals("remove log entries", 0, replicatedLog.size());
511
512                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
513
514                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
515                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
516
517                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
518
519                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
520             }};
521     }
522
523
524     @Test
525     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
526         new JavaTestKit(getSystem()) {
527             {
528                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
529
530                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
531
532                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
533
534                 CountDownLatch persistLatch = new CountDownLatch(1);
535                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
536                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
537
538                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
539                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
540
541                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
542
543                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
544
545                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
546
547                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
548
549             }
550         };
551     }
552
553     @Test
554     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
555         new JavaTestKit(getSystem()) {
556             {
557                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
558
559                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
560
561                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
562
563                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
564
565                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
566                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
567
568                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
569
570                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
571
572                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
573
574                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
575
576                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
577
578             }
579         };
580     }
581
582     @Test
583     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
584         new JavaTestKit(getSystem()) {
585             {
586                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
587
588                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
589
590                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
591
592                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
593
594                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
595                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
596
597                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
598
599                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
600
601                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
602
603                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
604
605                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
606
607             }
608         };
609     }
610
611     @Test
612     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
613         new JavaTestKit(getSystem()) {
614             {
615                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
616
617                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
618
619                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
620
621                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
622
623                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
624                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
625
626                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
627
628                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
629
630                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
631
632                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
633
634             }
635         };
636     }
637
638     @Test
639     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
640         new JavaTestKit(getSystem()) {
641             {
642                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
643
644                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
645
646                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
647
648                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
649
650                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
651                     MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
652                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
653
654                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
655
656                 ByteString snapshotBytes  = fromObject(Arrays.asList(
657                         new MockRaftActorContext.MockPayload("A"),
658                         new MockRaftActorContext.MockPayload("B"),
659                         new MockRaftActorContext.MockPayload("C"),
660                         new MockRaftActorContext.MockPayload("D")));
661
662                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
663
664                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
665
666                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
667
668                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
669
670                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
671
672                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
673
674             }
675         };
676     }
677
678     @Test
679     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
680         new JavaTestKit(getSystem()) {
681             {
682                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
683
684                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
685
686                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
687
688                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
689
690                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
691                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
692
693                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
694
695                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
696                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
697                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
698                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
699                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
700
701                 ByteString snapshotBytes = fromObject(Arrays.asList(
702                         new MockRaftActorContext.MockPayload("A"),
703                         new MockRaftActorContext.MockPayload("B"),
704                         new MockRaftActorContext.MockPayload("C"),
705                         new MockRaftActorContext.MockPayload("D")));
706
707                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
708                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
709
710                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
711
712                 verify(mockRaftActor.delegate).createSnapshot();
713
714                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
715
716                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
717
718                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
719
720                 verify(dataPersistenceProvider).deleteMessages(100);
721
722                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
723
724                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
725                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
726
727                 // Index 2 will not be in the log because it was removed due to snapshotting
728                 assertNull(mockRaftActor.getReplicatedLog().get(2));
729
730                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
731
732             }
733         };
734     }
735
736     @Test
737     public void testApplyState() throws Exception {
738
739         new JavaTestKit(getSystem()) {
740             {
741                 String persistenceId = "testApplyState";
742
743                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
744
745                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
746
747                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
748
749                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
750                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
751
752                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
753
754                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
755                         new MockRaftActorContext.MockPayload("F"));
756
757                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
758
759                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
760
761                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
762
763             }
764         };
765     }
766
767     @Test
768     public void testApplySnapshot() throws Exception {
769         new JavaTestKit(getSystem()) {
770             {
771                 String persistenceId = "testApplySnapshot";
772
773                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
774
775                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
776
777                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
778
779                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
780                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
781
782                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
783
784                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
785
786                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
787                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
788                 oldReplicatedLog.append(
789                     new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
790                         mock(Payload.class)));
791
792                 ByteString snapshotBytes = fromObject(Arrays.asList(
793                     new MockRaftActorContext.MockPayload("A"),
794                     new MockRaftActorContext.MockPayload("B"),
795                     new MockRaftActorContext.MockPayload("C"),
796                     new MockRaftActorContext.MockPayload("D")));
797
798                 Snapshot snapshot = mock(Snapshot.class);
799
800                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
801
802                 doReturn(3L).when(snapshot).getLastAppliedIndex();
803
804                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
805
806                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
807
808                 assertTrue("The replicatedLog should have changed",
809                     oldReplicatedLog != mockRaftActor.getReplicatedLog());
810
811                 assertEquals("lastApplied should be same as in the snapshot",
812                     (Long) 3L, mockRaftActor.getLastApplied());
813
814                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
815
816                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
817
818             }
819         };
820     }
821
822     @Test
823     public void testSaveSnapshotFailure() throws Exception {
824         new JavaTestKit(getSystem()) {
825             {
826                 String persistenceId = "testSaveSnapshotFailure";
827
828                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
829
830                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
831
832                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
833
834                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
835                         Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
836
837                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
838
839                 ByteString snapshotBytes  = fromObject(Arrays.asList(
840                         new MockRaftActorContext.MockPayload("A"),
841                         new MockRaftActorContext.MockPayload("B"),
842                         new MockRaftActorContext.MockPayload("C"),
843                         new MockRaftActorContext.MockPayload("D")));
844
845                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
846
847                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
848
849                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
850
851                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
852
853                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
854                         new Exception()));
855
856                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
857                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
858
859                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
860
861             }
862         };
863     }
864
865     private ByteString fromObject(Object snapshot) throws Exception {
866         ByteArrayOutputStream b = null;
867         ObjectOutputStream o = null;
868         try {
869             b = new ByteArrayOutputStream();
870             o = new ObjectOutputStream(b);
871             o.writeObject(snapshot);
872             byte[] snapshotBytes = b.toByteArray();
873             return ByteString.copyFrom(snapshotBytes);
874         } finally {
875             if (o != null) {
876                 o.flush();
877                 o.close();
878             }
879             if (b != null) {
880                 b.close();
881             }
882         }
883     }
884 }