Merge "Remove l2switch sample"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
35 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
36 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
37 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
38 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
43
44 import java.io.ByteArrayInputStream;
45 import java.io.ByteArrayOutputStream;
46 import java.io.IOException;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collections;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.TimeoutException;
57
58 import static org.junit.Assert.assertEquals;
59 import static org.junit.Assert.assertNotEquals;
60 import static org.junit.Assert.assertNotNull;
61 import static org.junit.Assert.assertNull;
62 import static org.junit.Assert.assertTrue;
63 import static org.mockito.Matchers.any;
64 import static org.mockito.Matchers.anyObject;
65 import static org.mockito.Matchers.eq;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.mock;
68 import static org.mockito.Mockito.times;
69 import static org.mockito.Mockito.verify;
70
71 public class RaftActorTest extends AbstractActorTest {
72
73
74     @After
75     public void tearDown() {
76         MockAkkaJournal.clearJournal();
77         MockSnapshotStore.setMockSnapshot(null);
78     }
79
80     public static class MockRaftActor extends RaftActor {
81
82         private final DataPersistenceProvider dataPersistenceProvider;
83         private final RaftActor delegate;
84
85         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
86             private final Map<String, String> peerAddresses;
87             private final String id;
88             private final Optional<ConfigParams> config;
89             private final DataPersistenceProvider dataPersistenceProvider;
90
91             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
92                     Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
93                 this.peerAddresses = peerAddresses;
94                 this.id = id;
95                 this.config = config;
96                 this.dataPersistenceProvider = dataPersistenceProvider;
97             }
98
99             @Override
100             public MockRaftActor create() throws Exception {
101                 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
102             }
103         }
104
105         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
106
107         private final List<Object> state;
108
109         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
110             super(id, peerAddresses, config);
111             state = new ArrayList<>();
112             this.delegate = mock(RaftActor.class);
113             if(dataPersistenceProvider == null){
114                 this.dataPersistenceProvider = new PersistentDataProvider();
115             } else {
116                 this.dataPersistenceProvider = dataPersistenceProvider;
117             }
118         }
119
120         public void waitForRecoveryComplete() {
121             try {
122                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
123             } catch (InterruptedException e) {
124                 e.printStackTrace();
125             }
126         }
127
128         public List<Object> getState() {
129             return state;
130         }
131
132         public static Props props(final String id, final Map<String, String> peerAddresses,
133                 Optional<ConfigParams> config){
134             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
135         }
136
137         public static Props props(final String id, final Map<String, String> peerAddresses,
138                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
139             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
140         }
141
142
143         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
144             delegate.applyState(clientActor, identifier, data);
145             LOG.info("applyState called");
146         }
147
148
149
150
151         @Override
152         protected void startLogRecoveryBatch(int maxBatchSize) {
153         }
154
155         @Override
156         protected void appendRecoveredLogEntry(Payload data) {
157             state.add(data);
158         }
159
160         @Override
161         protected void applyCurrentLogRecoveryBatch() {
162         }
163
164         @Override
165         protected void onRecoveryComplete() {
166             delegate.onRecoveryComplete();
167             recoveryComplete.countDown();
168         }
169
170         @Override
171         protected void applyRecoverySnapshot(ByteString snapshot) {
172             delegate.applyRecoverySnapshot(snapshot);
173             try {
174                 Object data = toObject(snapshot);
175                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
176                 if (data instanceof List) {
177                     state.addAll((List) data);
178                 }
179             } catch (Exception e) {
180                 e.printStackTrace();
181             }
182         }
183
184         @Override protected void createSnapshot() {
185             delegate.createSnapshot();
186         }
187
188         @Override protected void applySnapshot(ByteString snapshot) {
189             delegate.applySnapshot(snapshot);
190         }
191
192         @Override protected void onStateChanged() {
193             delegate.onStateChanged();
194         }
195
196         @Override
197         protected DataPersistenceProvider persistence() {
198             return this.dataPersistenceProvider;
199         }
200
201         @Override public String persistenceId() {
202             return this.getId();
203         }
204
205         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
206             Object obj = null;
207             ByteArrayInputStream bis = null;
208             ObjectInputStream ois = null;
209             try {
210                 bis = new ByteArrayInputStream(bs.toByteArray());
211                 ois = new ObjectInputStream(bis);
212                 obj = ois.readObject();
213             } finally {
214                 if (bis != null) {
215                     bis.close();
216                 }
217                 if (ois != null) {
218                     ois.close();
219                 }
220             }
221             return obj;
222         }
223
224         public ReplicatedLog getReplicatedLog(){
225             return this.getRaftActorContext().getReplicatedLog();
226         }
227
228     }
229
230
231     private static class RaftActorTestKit extends JavaTestKit {
232         private final ActorRef raftActor;
233
234         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
235             super(actorSystem);
236
237             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
238                     Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
239
240         }
241
242
243         public ActorRef getRaftActor() {
244             return raftActor;
245         }
246
247         public boolean waitForLogMessage(final Class logEventClass, String message){
248             // Wait for a specific log message to show up
249             return
250                 new JavaTestKit.EventFilter<Boolean>(logEventClass
251                 ) {
252                     @Override
253                     protected Boolean run() {
254                         return true;
255                     }
256                 }.from(raftActor.path().toString())
257                     .message(message)
258                     .occurrences(1).exec();
259
260
261         }
262
263         protected void waitUntilLeader(){
264             waitUntilLeader(raftActor);
265         }
266
267         protected void waitUntilLeader(ActorRef actorRef) {
268             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
269             for(int i = 0; i < 20 * 5; i++) {
270                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
271                 try {
272                     FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
273                     if(resp.getLeaderActor() != null) {
274                         return;
275                     }
276                 } catch(TimeoutException e) {
277                 } catch(Exception e) {
278                     System.err.println("FindLeader threw ex");
279                     e.printStackTrace();
280                 }
281
282
283                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
284             }
285
286             Assert.fail("Leader not found for actorRef " + actorRef.path());
287         }
288
289     }
290
291
292     @Test
293     public void testConstruction() {
294         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
295     }
296
297     @Test
298     public void testFindLeaderWhenLeaderIsSelf(){
299         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
300         kit.waitUntilLeader();
301     }
302
303     @Test
304     public void testRaftActorRecovery() throws Exception {
305         new JavaTestKit(getSystem()) {{
306             String persistenceId = "follower10";
307
308             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
309             // Set the heartbeat interval high to essentially disable election otherwise the test
310             // may fail if the actor is switched to Leader and the commitIndex is set to the last
311             // log entry.
312             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
313
314             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
315                     Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
316
317             watch(followerActor);
318
319             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
320             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
321                     new MockRaftActorContext.MockPayload("E"));
322             snapshotUnappliedEntries.add(entry1);
323
324             int lastAppliedDuringSnapshotCapture = 3;
325             int lastIndexDuringSnapshotCapture = 4;
326
327                 // 4 messages as part of snapshot, which are applied to state
328             ByteString snapshotBytes  = fromObject(Arrays.asList(
329                         new MockRaftActorContext.MockPayload("A"),
330                         new MockRaftActorContext.MockPayload("B"),
331                         new MockRaftActorContext.MockPayload("C"),
332                         new MockRaftActorContext.MockPayload("D")));
333
334             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
335                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
336                     lastAppliedDuringSnapshotCapture, 1);
337             MockSnapshotStore.setMockSnapshot(snapshot);
338             MockSnapshotStore.setPersistenceId(persistenceId);
339
340             // add more entries after snapshot is taken
341             List<ReplicatedLogEntry> entries = new ArrayList<>();
342             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
343                     new MockRaftActorContext.MockPayload("F"));
344             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
345                     new MockRaftActorContext.MockPayload("G"));
346             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
347                     new MockRaftActorContext.MockPayload("H"));
348             entries.add(entry2);
349             entries.add(entry3);
350             entries.add(entry4);
351
352             int lastAppliedToState = 5;
353             int lastIndex = 7;
354
355             MockAkkaJournal.addToJournal(5, entry2);
356             // 2 entries are applied to state besides the 4 entries in snapshot
357             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
358             MockAkkaJournal.addToJournal(7, entry3);
359             MockAkkaJournal.addToJournal(8, entry4);
360
361             // kill the actor
362             followerActor.tell(PoisonPill.getInstance(), null);
363             expectMsgClass(duration("5 seconds"), Terminated.class);
364
365             unwatch(followerActor);
366
367             //reinstate the actor
368             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
369                     MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
370                             Optional.<ConfigParams>of(config)));
371
372             ref.underlyingActor().waitForRecoveryComplete();
373
374             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
375             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
376                     context.getReplicatedLog().size());
377             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
378             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
379             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
380             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
381         }};
382     }
383
384     /**
385      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
386      * process recovery messages
387      *
388      * @throws Exception
389      */
390
391     @Test
392     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
393         new JavaTestKit(getSystem()) {
394             {
395                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
396
397                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
398
399                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
400
401                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
402                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
403
404                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
405
406                 // Wait for akka's recovery to complete so it doesn't interfere.
407                 mockRaftActor.waitForRecoveryComplete();
408
409                 ByteString snapshotBytes  = fromObject(Arrays.asList(
410                         new MockRaftActorContext.MockPayload("A"),
411                         new MockRaftActorContext.MockPayload("B"),
412                         new MockRaftActorContext.MockPayload("C"),
413                         new MockRaftActorContext.MockPayload("D")));
414
415                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
416                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
417
418                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
419
420                 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
421
422                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
423
424                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
425
426                 assertEquals("add replicated log entry", 1, replicatedLog.size());
427
428                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
429
430                 assertEquals("add replicated log entry", 2, replicatedLog.size());
431
432                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
433
434                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
435
436                 // The snapshot had 4 items + we added 2 more items during the test
437                 // We start removing from 5 and we should get 1 item in the replicated log
438                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
439
440                 assertEquals("remove log entries", 1, replicatedLog.size());
441
442                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
443
444                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
445                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
446
447                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
448
449                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
450
451             }};
452     }
453
454     /**
455      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
456      * not process recovery messages
457      *
458      * @throws Exception
459      */
460     @Test
461     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
462         new JavaTestKit(getSystem()) {
463             {
464                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
465
466                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
467
468                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
469
470                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
471                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
472
473                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
474
475                 // Wait for akka's recovery to complete so it doesn't interfere.
476                 mockRaftActor.waitForRecoveryComplete();
477
478                 ByteString snapshotBytes  = fromObject(Arrays.asList(
479                         new MockRaftActorContext.MockPayload("A"),
480                         new MockRaftActorContext.MockPayload("B"),
481                         new MockRaftActorContext.MockPayload("C"),
482                         new MockRaftActorContext.MockPayload("D")));
483
484                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
485                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
486
487                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
488
489                 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
490
491                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
492
493                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
494
495                 assertEquals("add replicated log entry", 0, replicatedLog.size());
496
497                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
498
499                 assertEquals("add replicated log entry", 0, replicatedLog.size());
500
501                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
502
503                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
504
505                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
506
507                 assertEquals("remove log entries", 0, replicatedLog.size());
508
509                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
510
511                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
512                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
513
514                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
515
516                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
517             }};
518     }
519
520
521     @Test
522     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
523         new JavaTestKit(getSystem()) {
524             {
525                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
526
527                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
528
529                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
530
531                 CountDownLatch persistLatch = new CountDownLatch(1);
532                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
533                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
534
535                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
536                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
537
538                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
539
540                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
541
542                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
543
544                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
545
546             }
547         };
548     }
549
550     @Test
551     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
552         new JavaTestKit(getSystem()) {
553             {
554                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
555
556                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
557
558                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
559
560
561                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
562
563                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
564                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
565
566                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
567
568                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
569
570                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
571
572                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
573
574                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
575
576             }
577         };
578     }
579
580     @Test
581     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
582         new JavaTestKit(getSystem()) {
583             {
584                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
585
586                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
587
588                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
589
590                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
591
592                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
593                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
594
595                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
596
597                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
598
599                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
600
601                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
602
603
604                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
605
606             }
607         };
608     }
609
610     @Test
611     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
612         new JavaTestKit(getSystem()) {
613             {
614                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
615
616                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
617
618                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
619
620                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
621
622                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
623                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
624
625                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
626
627                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
628
629                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
630
631                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
632
633             }
634         };
635     }
636
637     @Test
638     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
639         new JavaTestKit(getSystem()) {
640             {
641                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
642
643                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
644
645                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
646
647                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
648
649                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
650                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
651
652                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
653
654                 ByteString snapshotBytes  = fromObject(Arrays.asList(
655                         new MockRaftActorContext.MockPayload("A"),
656                         new MockRaftActorContext.MockPayload("B"),
657                         new MockRaftActorContext.MockPayload("C"),
658                         new MockRaftActorContext.MockPayload("D")));
659
660                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
661
662                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
663
664                 verify(dataPersistenceProvider).saveSnapshot(anyObject());
665
666                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
667
668             }
669         };
670     }
671
672     @Test
673     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
674         new JavaTestKit(getSystem()) {
675             {
676                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
677
678                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
679
680                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
681
682                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
683
684                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
685                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
686
687                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
688
689                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
690                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
691                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
692                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
693                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
694
695                 ByteString snapshotBytes = fromObject(Arrays.asList(
696                         new MockRaftActorContext.MockPayload("A"),
697                         new MockRaftActorContext.MockPayload("B"),
698                         new MockRaftActorContext.MockPayload("C"),
699                         new MockRaftActorContext.MockPayload("D")));
700
701                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
702
703                 verify(mockRaftActor.delegate).createSnapshot();
704
705                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
706
707                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
708
709                 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
710
711                 verify(dataPersistenceProvider).deleteMessages(100);
712
713                 assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot());
714
715                 assertEquals(2, mockRaftActor.getReplicatedLog().size());
716
717                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
718                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
719
720                 // Index 2 will not be in the log because it was removed due to snapshotting
721                 assertNull(mockRaftActor.getReplicatedLog().get(2));
722
723                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
724
725             }
726         };
727     }
728
729     @Test
730     public void testApplyState() throws Exception {
731
732         new JavaTestKit(getSystem()) {
733             {
734                 String persistenceId = "testApplyState";
735
736                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
737
738                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
739
740                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
741
742                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
743                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
744
745                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
746
747                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
748                         new MockRaftActorContext.MockPayload("F"));
749
750                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
751
752                 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
753
754                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
755
756             }
757         };
758
759
760     }
761
762     @Test
763     public void testApplySnapshot() throws Exception {
764         new JavaTestKit(getSystem()) {
765             {
766                 String persistenceId = "testApplySnapshot";
767
768                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
769
770                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
771
772                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
773
774                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
775                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
776
777                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
778
779                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
780
781                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
782                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
783                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class)));
784
785                 ByteString snapshotBytes = fromObject(Arrays.asList(
786                         new MockRaftActorContext.MockPayload("A"),
787                         new MockRaftActorContext.MockPayload("B"),
788                         new MockRaftActorContext.MockPayload("C"),
789                         new MockRaftActorContext.MockPayload("D")));
790
791                 Snapshot snapshot = mock(Snapshot.class);
792
793                 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
794
795                 doReturn(3L).when(snapshot).getLastAppliedIndex();
796
797                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
798
799                 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
800
801                 assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog());
802
803                 assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied());
804
805                 assertEquals(0, mockRaftActor.getReplicatedLog().size());
806
807                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
808
809             }
810         };
811     }
812
813     @Test
814     public void testSaveSnapshotFailure() throws Exception {
815         new JavaTestKit(getSystem()) {
816             {
817                 String persistenceId = "testSaveSnapshotFailure";
818
819                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
820
821                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
822
823                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
824
825                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
826                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
827
828                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
829
830                 ByteString snapshotBytes  = fromObject(Arrays.asList(
831                         new MockRaftActorContext.MockPayload("A"),
832                         new MockRaftActorContext.MockPayload("B"),
833                         new MockRaftActorContext.MockPayload("C"),
834                         new MockRaftActorContext.MockPayload("D")));
835
836                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
837
838                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
839
840                 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
841                         new Exception()));
842
843                 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
844                         mockRaftActor.getReplicatedLog().getSnapshotIndex());
845
846                 assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot());
847
848                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
849
850             }
851         };
852     }
853
854     private ByteString fromObject(Object snapshot) throws Exception {
855         ByteArrayOutputStream b = null;
856         ObjectOutputStream o = null;
857         try {
858             b = new ByteArrayOutputStream();
859             o = new ObjectOutputStream(b);
860             o.writeObject(snapshot);
861             byte[] snapshotBytes = b.toByteArray();
862             return ByteString.copyFrom(snapshotBytes);
863         } finally {
864             if (o != null) {
865                 o.flush();
866                 o.close();
867             }
868             if (b != null) {
869                 b.close();
870             }
871         }
872     }
873 }