Merge "BUG-1953: fix SAL compatility layer"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SaveSnapshotSuccess;
12 import akka.persistence.SnapshotMetadata;
13 import akka.persistence.SnapshotOffer;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.Lists;
18 import com.google.protobuf.ByteString;
19 import org.junit.After;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.DataPersistenceProvider;
22 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
30 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.io.ByteArrayInputStream;
34 import java.io.ByteArrayOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectInputStream;
37 import java.io.ObjectOutputStream;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotEquals;
48 import static org.mockito.Mockito.mock;
49
50 public class RaftActorTest extends AbstractActorTest {
51
52
53     @After
54     public void tearDown() {
55         MockAkkaJournal.clearJournal();
56         MockSnapshotStore.setMockSnapshot(null);
57     }
58
59     public static class MockRaftActor extends RaftActor {
60
61         private final DataPersistenceProvider dataPersistenceProvider;
62
63         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
64             private final Map<String, String> peerAddresses;
65             private final String id;
66             private final Optional<ConfigParams> config;
67             private final DataPersistenceProvider dataPersistenceProvider;
68
69             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
70                     Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
71                 this.peerAddresses = peerAddresses;
72                 this.id = id;
73                 this.config = config;
74                 this.dataPersistenceProvider = dataPersistenceProvider;
75             }
76
77             @Override
78             public MockRaftActor create() throws Exception {
79                 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
80             }
81         }
82
83         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
84         private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
85         private final CountDownLatch applyStateLatch = new CountDownLatch(1);
86
87         private final List<Object> state;
88
89         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
90             super(id, peerAddresses, config);
91             state = new ArrayList<>();
92             if(dataPersistenceProvider == null){
93                 this.dataPersistenceProvider = new PersistentDataProvider();
94             } else {
95                 this.dataPersistenceProvider = dataPersistenceProvider;
96             }
97         }
98
99         public void waitForRecoveryComplete() {
100             try {
101                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
102             } catch (InterruptedException e) {
103                 e.printStackTrace();
104             }
105         }
106
107         public CountDownLatch getApplyRecoverySnapshotLatch(){
108             return applyRecoverySnapshot;
109         }
110
111         public List<Object> getState() {
112             return state;
113         }
114
115         public static Props props(final String id, final Map<String, String> peerAddresses,
116                 Optional<ConfigParams> config){
117             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
118         }
119
120         public static Props props(final String id, final Map<String, String> peerAddresses,
121                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
122             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
123         }
124
125
126         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
127             applyStateLatch.countDown();
128         }
129
130         @Override
131         protected void startLogRecoveryBatch(int maxBatchSize) {
132         }
133
134         @Override
135         protected void appendRecoveredLogEntry(Payload data) {
136             state.add(data);
137         }
138
139         @Override
140         protected void applyCurrentLogRecoveryBatch() {
141         }
142
143         @Override
144         protected void onRecoveryComplete() {
145             recoveryComplete.countDown();
146         }
147
148         @Override
149         protected void applyRecoverySnapshot(ByteString snapshot) {
150             applyRecoverySnapshot.countDown();
151             try {
152                 Object data = toObject(snapshot);
153                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
154                 if (data instanceof List) {
155                     state.addAll((List) data);
156                 }
157             } catch (Exception e) {
158                 e.printStackTrace();
159             }
160         }
161
162         @Override protected void createSnapshot() {
163         }
164
165         @Override protected void applySnapshot(ByteString snapshot) {
166         }
167
168         @Override protected void onStateChanged() {
169         }
170
171         @Override
172         protected DataPersistenceProvider persistence() {
173             return this.dataPersistenceProvider;
174         }
175
176         @Override public String persistenceId() {
177             return this.getId();
178         }
179
180         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
181             Object obj = null;
182             ByteArrayInputStream bis = null;
183             ObjectInputStream ois = null;
184             try {
185                 bis = new ByteArrayInputStream(bs.toByteArray());
186                 ois = new ObjectInputStream(bis);
187                 obj = ois.readObject();
188             } finally {
189                 if (bis != null) {
190                     bis.close();
191                 }
192                 if (ois != null) {
193                     ois.close();
194                 }
195             }
196             return obj;
197         }
198
199         public ReplicatedLog getReplicatedLog(){
200             return this.getRaftActorContext().getReplicatedLog();
201         }
202
203     }
204
205
206     private static class RaftActorTestKit extends JavaTestKit {
207         private final ActorRef raftActor;
208
209         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
210             super(actorSystem);
211
212             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
213                     Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
214
215         }
216
217
218         public boolean waitForStartup(){
219             // Wait for a specific log message to show up
220             return
221                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
222                 ) {
223                     @Override
224                     protected Boolean run() {
225                         return true;
226                     }
227                 }.from(raftActor.path().toString())
228                     .message("Switching from behavior Candidate to Leader")
229                     .occurrences(1).exec();
230
231
232         }
233
234         public void findLeader(final String expectedLeader){
235             raftActor.tell(new FindLeader(), getRef());
236
237             FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
238             assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
239         }
240
241         public ActorRef getRaftActor() {
242             return raftActor;
243         }
244     }
245
246
247     @Test
248     public void testConstruction() {
249         boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
250         assertEquals(true, started);
251     }
252
253     @Test
254     public void testFindLeaderWhenLeaderIsSelf(){
255         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
256         kit.waitForStartup();
257         kit.findLeader(kit.getRaftActor().path().toString());
258     }
259
260     @Test
261     public void testRaftActorRecovery() throws Exception {
262         new JavaTestKit(getSystem()) {{
263             String persistenceId = "follower10";
264
265             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
266             // Set the heartbeat interval high to essentially disable election otherwise the test
267             // may fail if the actor is switched to Leader and the commitIndex is set to the last
268             // log entry.
269             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
270
271             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
272                     Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
273
274             watch(followerActor);
275
276             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
277             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
278                     new MockRaftActorContext.MockPayload("E"));
279             snapshotUnappliedEntries.add(entry1);
280
281             int lastAppliedDuringSnapshotCapture = 3;
282             int lastIndexDuringSnapshotCapture = 4;
283
284                 // 4 messages as part of snapshot, which are applied to state
285             ByteString snapshotBytes  = fromObject(Arrays.asList(
286                         new MockRaftActorContext.MockPayload("A"),
287                         new MockRaftActorContext.MockPayload("B"),
288                         new MockRaftActorContext.MockPayload("C"),
289                         new MockRaftActorContext.MockPayload("D")));
290
291             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
292                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
293                     lastAppliedDuringSnapshotCapture, 1);
294             MockSnapshotStore.setMockSnapshot(snapshot);
295             MockSnapshotStore.setPersistenceId(persistenceId);
296
297             // add more entries after snapshot is taken
298             List<ReplicatedLogEntry> entries = new ArrayList<>();
299             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
300                     new MockRaftActorContext.MockPayload("F"));
301             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
302                     new MockRaftActorContext.MockPayload("G"));
303             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
304                     new MockRaftActorContext.MockPayload("H"));
305             entries.add(entry2);
306             entries.add(entry3);
307             entries.add(entry4);
308
309             int lastAppliedToState = 5;
310             int lastIndex = 7;
311
312             MockAkkaJournal.addToJournal(5, entry2);
313             // 2 entries are applied to state besides the 4 entries in snapshot
314             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
315             MockAkkaJournal.addToJournal(7, entry3);
316             MockAkkaJournal.addToJournal(8, entry4);
317
318             // kill the actor
319             followerActor.tell(PoisonPill.getInstance(), null);
320             expectMsgClass(duration("5 seconds"), Terminated.class);
321
322             unwatch(followerActor);
323
324             //reinstate the actor
325             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
326                     MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
327                             Optional.<ConfigParams>of(config)));
328
329             ref.underlyingActor().waitForRecoveryComplete();
330
331             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
332             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
333                     context.getReplicatedLog().size());
334             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
335             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
336             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
337             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
338         }};
339     }
340
341     /**
342      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
343      * process recovery messages
344      *
345      * @throws Exception
346      */
347
348     @Test
349     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
350         new JavaTestKit(getSystem()) {
351             {
352                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
353
354                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
355
356                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
357
358                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
359                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
360
361                 ByteString snapshotBytes  = fromObject(Arrays.asList(
362                         new MockRaftActorContext.MockPayload("A"),
363                         new MockRaftActorContext.MockPayload("B"),
364                         new MockRaftActorContext.MockPayload("C"),
365                         new MockRaftActorContext.MockPayload("D")));
366
367                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
368                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
369
370                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
371
372                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
373
374                 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
375
376                 assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
377
378                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
379
380                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
381
382                 assertEquals("add replicated log entry", 1, replicatedLog.size());
383
384                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
385
386                 assertEquals("add replicated log entry", 2, replicatedLog.size());
387
388                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
389
390                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
391
392                 // The snapshot had 4 items + we added 2 more items during the test
393                 // We start removing from 5 and we should get 1 item in the replicated log
394                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
395
396                 assertEquals("remove log entries", 1, replicatedLog.size());
397
398                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
399
400                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
401                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
402
403                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
404
405                 mockRaftActor.waitForRecoveryComplete();
406
407                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
408
409             }};
410     }
411
412     /**
413      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
414      * not process recovery messages
415      *
416      * @throws Exception
417      */
418     @Test
419     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
420         new JavaTestKit(getSystem()) {
421             {
422                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
423
424                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
425
426                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
427
428                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
429                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
430
431                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
432
433                 ByteString snapshotBytes  = fromObject(Arrays.asList(
434                         new MockRaftActorContext.MockPayload("A"),
435                         new MockRaftActorContext.MockPayload("B"),
436                         new MockRaftActorContext.MockPayload("C"),
437                         new MockRaftActorContext.MockPayload("D")));
438
439                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
440                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
441
442                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
443
444                 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
445
446                 assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
447
448                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
449
450                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
451
452                 assertEquals("add replicated log entry", 0, replicatedLog.size());
453
454                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
455
456                 assertEquals("add replicated log entry", 0, replicatedLog.size());
457
458                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
459
460                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
461
462                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
463
464                 assertEquals("remove log entries", 0, replicatedLog.size());
465
466                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
467
468                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
469                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
470
471                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
472
473                 mockRaftActor.waitForRecoveryComplete();
474
475                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
476             }};
477     }
478
479
480     @Test
481     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
482         new JavaTestKit(getSystem()) {
483             {
484                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
485
486                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
487
488                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
489
490                 CountDownLatch persistLatch = new CountDownLatch(1);
491                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
492                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
493
494                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
495                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
496
497                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
498
499                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
500
501                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
502
503                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
504
505             }
506         };
507     }
508
509     @Test
510     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
511         new JavaTestKit(getSystem()) {
512             {
513                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
514
515                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
516
517                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
518
519                 CountDownLatch persistLatch = new CountDownLatch(1);
520                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
521                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
522
523                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
524                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
525
526                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
527
528                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
529
530                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
531
532                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
533
534             }
535         };
536     }
537
538     @Test
539     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
540         new JavaTestKit(getSystem()) {
541             {
542                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
543
544                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
545
546                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
547
548                 CountDownLatch persistLatch = new CountDownLatch(2);
549                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
550                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
551
552                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
553                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
554
555                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
556
557                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
558
559                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
560
561                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
562
563                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
564
565             }
566         };
567     }
568
569     @Test
570     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
571         new JavaTestKit(getSystem()) {
572             {
573                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
574
575                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
576
577                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
578
579                 CountDownLatch persistLatch = new CountDownLatch(1);
580                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
581                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
582
583                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
584                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
585
586                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
587
588                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
589
590                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
591
592                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
593
594             }
595         };
596     }
597
598     @Test
599     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
600         new JavaTestKit(getSystem()) {
601             {
602                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
603
604                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
605
606                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
607
608                 CountDownLatch persistLatch = new CountDownLatch(1);
609                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
610                 dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
611
612                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
613                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
614
615                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
616
617                 ByteString snapshotBytes  = fromObject(Arrays.asList(
618                         new MockRaftActorContext.MockPayload("A"),
619                         new MockRaftActorContext.MockPayload("B"),
620                         new MockRaftActorContext.MockPayload("C"),
621                         new MockRaftActorContext.MockPayload("D")));
622
623                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
624
625                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
626
627                 assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
628
629                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
630
631             }
632         };
633     }
634
635     @Test
636     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
637         new JavaTestKit(getSystem()) {
638             {
639                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
640
641                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
642
643                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
644
645                 CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
646                 CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
647                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
648                 dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
649                 dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
650
651                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
652                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
653
654                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
655
656                 ByteString snapshotBytes  = fromObject(Arrays.asList(
657                         new MockRaftActorContext.MockPayload("A"),
658                         new MockRaftActorContext.MockPayload("B"),
659                         new MockRaftActorContext.MockPayload("C"),
660                         new MockRaftActorContext.MockPayload("D")));
661
662                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
663
664                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
665
666                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
667
668                 assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
669
670                 assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
671
672                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
673
674             }
675         };
676     }
677
678     private ByteString fromObject(Object snapshot) throws Exception {
679         ByteArrayOutputStream b = null;
680         ObjectOutputStream o = null;
681         try {
682             b = new ByteArrayOutputStream();
683             o = new ObjectOutputStream(b);
684             o.writeObject(snapshot);
685             byte[] snapshotBytes = b.toByteArray();
686             return ByteString.copyFrom(snapshotBytes);
687         } finally {
688             if (o != null) {
689                 o.flush();
690                 o.close();
691             }
692             if (b != null) {
693                 b.close();
694             }
695         }
696     }
697 }