Merge "Bug 2294: Handle Shard backwards compatibility"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SaveSnapshotSuccess;
12 import akka.persistence.SnapshotMetadata;
13 import akka.persistence.SnapshotOffer;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.Lists;
18 import com.google.protobuf.ByteString;
19 import org.junit.After;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.DataPersistenceProvider;
22 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
30 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.io.ByteArrayInputStream;
34 import java.io.ByteArrayOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectInputStream;
37 import java.io.ObjectOutputStream;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotEquals;
48 import static org.mockito.Mockito.mock;
49
50 public class RaftActorTest extends AbstractActorTest {
51
52
53     @After
54     public void tearDown() {
55         MockAkkaJournal.clearJournal();
56         MockSnapshotStore.setMockSnapshot(null);
57     }
58
59     public static class MockRaftActor extends RaftActor {
60
61         private final DataPersistenceProvider dataPersistenceProvider;
62
63         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
64             private final Map<String, String> peerAddresses;
65             private final String id;
66             private final Optional<ConfigParams> config;
67             private final DataPersistenceProvider dataPersistenceProvider;
68
69             private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
70                     Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
71                 this.peerAddresses = peerAddresses;
72                 this.id = id;
73                 this.config = config;
74                 this.dataPersistenceProvider = dataPersistenceProvider;
75             }
76
77             @Override
78             public MockRaftActor create() throws Exception {
79                 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
80             }
81         }
82
83         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
84         private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
85         private final CountDownLatch applyStateLatch = new CountDownLatch(1);
86
87         private final List<Object> state;
88
89         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
90             super(id, peerAddresses, config);
91             state = new ArrayList<>();
92             if(dataPersistenceProvider == null){
93                 this.dataPersistenceProvider = new PersistentDataProvider();
94             } else {
95                 this.dataPersistenceProvider = dataPersistenceProvider;
96             }
97         }
98
99         public void waitForRecoveryComplete() {
100             try {
101                 assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
102             } catch (InterruptedException e) {
103                 e.printStackTrace();
104             }
105         }
106
107         public CountDownLatch getApplyRecoverySnapshotLatch(){
108             return applyRecoverySnapshot;
109         }
110
111         public List<Object> getState() {
112             return state;
113         }
114
115         public static Props props(final String id, final Map<String, String> peerAddresses,
116                 Optional<ConfigParams> config){
117             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
118         }
119
120         public static Props props(final String id, final Map<String, String> peerAddresses,
121                                   Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
122             return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
123         }
124
125
126         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
127             applyStateLatch.countDown();
128         }
129
130         @Override
131         protected void startLogRecoveryBatch(int maxBatchSize) {
132         }
133
134         @Override
135         protected void appendRecoveredLogEntry(Payload data) {
136             state.add(data);
137         }
138
139         @Override
140         protected void applyCurrentLogRecoveryBatch() {
141         }
142
143         @Override
144         protected void onRecoveryComplete() {
145             recoveryComplete.countDown();
146         }
147
148         @Override
149         protected void applyRecoverySnapshot(ByteString snapshot) {
150             applyRecoverySnapshot.countDown();
151             try {
152                 Object data = toObject(snapshot);
153                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
154                 if (data instanceof List) {
155                     state.addAll((List) data);
156                 }
157             } catch (Exception e) {
158                 e.printStackTrace();
159             }
160         }
161
162         @Override protected void createSnapshot() {
163         }
164
165         @Override protected void applySnapshot(ByteString snapshot) {
166         }
167
168         @Override protected void onStateChanged() {
169         }
170
171         @Override
172         protected DataPersistenceProvider persistence() {
173             return this.dataPersistenceProvider;
174         }
175
176         @Override public String persistenceId() {
177             return this.getId();
178         }
179
180         private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
181             Object obj = null;
182             ByteArrayInputStream bis = null;
183             ObjectInputStream ois = null;
184             try {
185                 bis = new ByteArrayInputStream(bs.toByteArray());
186                 ois = new ObjectInputStream(bis);
187                 obj = ois.readObject();
188             } finally {
189                 if (bis != null) {
190                     bis.close();
191                 }
192                 if (ois != null) {
193                     ois.close();
194                 }
195             }
196             return obj;
197         }
198
199         public ReplicatedLog getReplicatedLog(){
200             return this.getRaftActorContext().getReplicatedLog();
201         }
202
203     }
204
205
206     private static class RaftActorTestKit extends JavaTestKit {
207         private final ActorRef raftActor;
208
209         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
210             super(actorSystem);
211
212             raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
213                     Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
214
215         }
216
217
218         public boolean waitForStartup(){
219             // Wait for a specific log message to show up
220             return
221                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
222                 ) {
223                     @Override
224                     protected Boolean run() {
225                         return true;
226                     }
227                 }.from(raftActor.path().toString())
228                     .message("Switching from behavior Candidate to Leader")
229                     .occurrences(1).exec();
230
231
232         }
233
234         public void findLeader(final String expectedLeader){
235             raftActor.tell(new FindLeader(), getRef());
236
237             FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
238             assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
239         }
240
241         public ActorRef getRaftActor() {
242             return raftActor;
243         }
244     }
245
246
247     @Test
248     public void testConstruction() {
249         boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
250         assertEquals(true, started);
251     }
252
253     @Test
254     public void testFindLeaderWhenLeaderIsSelf(){
255         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
256         kit.waitForStartup();
257         kit.findLeader(kit.getRaftActor().path().toString());
258     }
259
260     @Test
261     public void testRaftActorRecovery() throws Exception {
262         new JavaTestKit(getSystem()) {{
263             String persistenceId = "follower10";
264
265             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
266             // Set the heartbeat interval high to essentially disable election otherwise the test
267             // may fail if the actor is switched to Leader and the commitIndex is set to the last
268             // log entry.
269             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
270
271             ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
272                     Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
273
274             watch(followerActor);
275
276             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
277             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
278                     new MockRaftActorContext.MockPayload("E"));
279             snapshotUnappliedEntries.add(entry1);
280
281             int lastAppliedDuringSnapshotCapture = 3;
282             int lastIndexDuringSnapshotCapture = 4;
283
284                 // 4 messages as part of snapshot, which are applied to state
285             ByteString snapshotBytes  = fromObject(Arrays.asList(
286                         new MockRaftActorContext.MockPayload("A"),
287                         new MockRaftActorContext.MockPayload("B"),
288                         new MockRaftActorContext.MockPayload("C"),
289                         new MockRaftActorContext.MockPayload("D")));
290
291             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
292                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
293                     lastAppliedDuringSnapshotCapture, 1);
294             MockSnapshotStore.setMockSnapshot(snapshot);
295             MockSnapshotStore.setPersistenceId(persistenceId);
296
297             // add more entries after snapshot is taken
298             List<ReplicatedLogEntry> entries = new ArrayList<>();
299             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
300                     new MockRaftActorContext.MockPayload("F"));
301             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
302                     new MockRaftActorContext.MockPayload("G"));
303             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
304                     new MockRaftActorContext.MockPayload("H"));
305             entries.add(entry2);
306             entries.add(entry3);
307             entries.add(entry4);
308
309             int lastAppliedToState = 5;
310             int lastIndex = 7;
311
312             MockAkkaJournal.addToJournal(5, entry2);
313             // 2 entries are applied to state besides the 4 entries in snapshot
314             MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
315             MockAkkaJournal.addToJournal(7, entry3);
316             MockAkkaJournal.addToJournal(8, entry4);
317
318             // kill the actor
319             followerActor.tell(PoisonPill.getInstance(), null);
320             expectMsgClass(duration("5 seconds"), Terminated.class);
321
322             unwatch(followerActor);
323
324             //reinstate the actor
325             TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
326                     MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
327                             Optional.<ConfigParams>of(config)));
328
329             ref.underlyingActor().waitForRecoveryComplete();
330
331             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
332             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
333                     context.getReplicatedLog().size());
334             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
335             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
336             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
337             assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
338         }};
339     }
340
341     /**
342      * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
343      * process recovery messages
344      *
345      * @throws Exception
346      */
347
348     @Test
349     public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
350         new JavaTestKit(getSystem()) {
351             {
352                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
353
354                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
355
356                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
357
358                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
359                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
360
361                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
362
363                 // Wait for akka's recovery to complete so it doesn't interfere.
364                 mockRaftActor.waitForRecoveryComplete();
365
366                 ByteString snapshotBytes  = fromObject(Arrays.asList(
367                         new MockRaftActorContext.MockPayload("A"),
368                         new MockRaftActorContext.MockPayload("B"),
369                         new MockRaftActorContext.MockPayload("C"),
370                         new MockRaftActorContext.MockPayload("D")));
371
372                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
373                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
374
375                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
376
377                 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
378
379                 assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
380
381                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
382
383                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
384
385                 assertEquals("add replicated log entry", 1, replicatedLog.size());
386
387                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
388
389                 assertEquals("add replicated log entry", 2, replicatedLog.size());
390
391                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
392
393                 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
394
395                 // The snapshot had 4 items + we added 2 more items during the test
396                 // We start removing from 5 and we should get 1 item in the replicated log
397                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
398
399                 assertEquals("remove log entries", 1, replicatedLog.size());
400
401                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
402
403                 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
404                 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
405
406                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
407
408                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
409
410             }};
411     }
412
413     /**
414      * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
415      * not process recovery messages
416      *
417      * @throws Exception
418      */
419     @Test
420     public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
421         new JavaTestKit(getSystem()) {
422             {
423                 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
424
425                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
426
427                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
428
429                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
430                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
431
432                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
433
434                 // Wait for akka's recovery to complete so it doesn't interfere.
435                 mockRaftActor.waitForRecoveryComplete();
436
437                 ByteString snapshotBytes  = fromObject(Arrays.asList(
438                         new MockRaftActorContext.MockPayload("A"),
439                         new MockRaftActorContext.MockPayload("B"),
440                         new MockRaftActorContext.MockPayload("C"),
441                         new MockRaftActorContext.MockPayload("D")));
442
443                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
444                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
445
446                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
447
448                 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
449
450                 assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
451
452                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
453
454                 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
455
456                 assertEquals("add replicated log entry", 0, replicatedLog.size());
457
458                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
459
460                 assertEquals("add replicated log entry", 0, replicatedLog.size());
461
462                 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
463
464                 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
465
466                 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
467
468                 assertEquals("remove log entries", 0, replicatedLog.size());
469
470                 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
471
472                 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
473                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
474
475                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
476
477                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
478             }};
479     }
480
481
482     @Test
483     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
484         new JavaTestKit(getSystem()) {
485             {
486                 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
487
488                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
489
490                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
491
492                 CountDownLatch persistLatch = new CountDownLatch(1);
493                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
494                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
495
496                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
497                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
498
499                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
500
501                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
502
503                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
504
505                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
506
507             }
508         };
509     }
510
511     @Test
512     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
513         new JavaTestKit(getSystem()) {
514             {
515                 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
516
517                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
518
519                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
520
521                 CountDownLatch persistLatch = new CountDownLatch(1);
522                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
523                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
524
525                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
526                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
527
528                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
529
530                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
531
532                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
533
534                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
535
536             }
537         };
538     }
539
540     @Test
541     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
542         new JavaTestKit(getSystem()) {
543             {
544                 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
545
546                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
547
548                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
549
550                 CountDownLatch persistLatch = new CountDownLatch(2);
551                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
552                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
553
554                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
555                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
556
557                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
558
559                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
560
561                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
562
563                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
564
565                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
566
567             }
568         };
569     }
570
571     @Test
572     public void testApplyLogEntriesCallsDataPersistence() throws Exception {
573         new JavaTestKit(getSystem()) {
574             {
575                 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
576
577                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
578
579                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
580
581                 CountDownLatch persistLatch = new CountDownLatch(1);
582                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
583                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
584
585                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
586                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
587
588                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
589
590                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
591
592                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
593
594                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
595
596             }
597         };
598     }
599
600     @Test
601     public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
602         new JavaTestKit(getSystem()) {
603             {
604                 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
605
606                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
607
608                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
609
610                 CountDownLatch persistLatch = new CountDownLatch(1);
611                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
612                 dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
613
614                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
615                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
616
617                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
618
619                 ByteString snapshotBytes  = fromObject(Arrays.asList(
620                         new MockRaftActorContext.MockPayload("A"),
621                         new MockRaftActorContext.MockPayload("B"),
622                         new MockRaftActorContext.MockPayload("C"),
623                         new MockRaftActorContext.MockPayload("D")));
624
625                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
626
627                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
628
629                 assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
630
631                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
632
633             }
634         };
635     }
636
637     @Test
638     public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
639         new JavaTestKit(getSystem()) {
640             {
641                 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
642
643                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
644
645                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
646
647                 CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
648                 CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
649                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
650                 dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
651                 dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
652
653                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
654                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
655
656                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
657
658                 ByteString snapshotBytes  = fromObject(Arrays.asList(
659                         new MockRaftActorContext.MockPayload("A"),
660                         new MockRaftActorContext.MockPayload("B"),
661                         new MockRaftActorContext.MockPayload("C"),
662                         new MockRaftActorContext.MockPayload("D")));
663
664                 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
665
666                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
667
668                 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
669
670                 assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
671
672                 assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
673
674                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
675
676             }
677         };
678     }
679
680     private ByteString fromObject(Object snapshot) throws Exception {
681         ByteArrayOutputStream b = null;
682         ObjectOutputStream o = null;
683         try {
684             b = new ByteArrayOutputStream();
685             o = new ObjectOutputStream(b);
686             o.writeObject(snapshot);
687             byte[] snapshotBytes = b.toByteArray();
688             return ByteString.copyFrom(snapshotBytes);
689         } finally {
690             if (o != null) {
691                 o.flush();
692                 o.close();
693             }
694             if (b != null) {
695                 b.close();
696             }
697         }
698     }
699 }