1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SaveSnapshotSuccess;
12 import akka.persistence.SnapshotMetadata;
13 import akka.persistence.SnapshotOffer;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.Lists;
18 import com.google.protobuf.ByteString;
19 import org.junit.After;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.DataPersistenceProvider;
22 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
30 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
31 import scala.concurrent.duration.FiniteDuration;
33 import java.io.ByteArrayInputStream;
34 import java.io.ByteArrayOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectInputStream;
37 import java.io.ObjectOutputStream;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotEquals;
48 import static org.mockito.Mockito.mock;
50 public class RaftActorTest extends AbstractActorTest {
54 public void tearDown() {
55 MockAkkaJournal.clearJournal();
56 MockSnapshotStore.setMockSnapshot(null);
59 public static class MockRaftActor extends RaftActor {
61 private final DataPersistenceProvider dataPersistenceProvider;
63 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
64 private final Map<String, String> peerAddresses;
65 private final String id;
66 private final Optional<ConfigParams> config;
67 private final DataPersistenceProvider dataPersistenceProvider;
69 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
70 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
71 this.peerAddresses = peerAddresses;
74 this.dataPersistenceProvider = dataPersistenceProvider;
78 public MockRaftActor create() throws Exception {
79 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
83 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
84 private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
85 private final CountDownLatch applyStateLatch = new CountDownLatch(1);
87 private final List<Object> state;
89 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
90 super(id, peerAddresses, config);
91 state = new ArrayList<>();
92 if(dataPersistenceProvider == null){
93 this.dataPersistenceProvider = new PersistentDataProvider();
95 this.dataPersistenceProvider = dataPersistenceProvider;
99 public void waitForRecoveryComplete() {
101 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
102 } catch (InterruptedException e) {
107 public CountDownLatch getApplyRecoverySnapshotLatch(){
108 return applyRecoverySnapshot;
111 public List<Object> getState() {
115 public static Props props(final String id, final Map<String, String> peerAddresses,
116 Optional<ConfigParams> config){
117 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
120 public static Props props(final String id, final Map<String, String> peerAddresses,
121 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
122 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
126 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
127 applyStateLatch.countDown();
131 protected void startLogRecoveryBatch(int maxBatchSize) {
135 protected void appendRecoveredLogEntry(Payload data) {
140 protected void applyCurrentLogRecoveryBatch() {
144 protected void onRecoveryComplete() {
145 recoveryComplete.countDown();
149 protected void applyRecoverySnapshot(ByteString snapshot) {
150 applyRecoverySnapshot.countDown();
152 Object data = toObject(snapshot);
153 System.out.println("!!!!!applyRecoverySnapshot: "+data);
154 if (data instanceof List) {
155 state.addAll((List) data);
157 } catch (Exception e) {
162 @Override protected void createSnapshot() {
165 @Override protected void applySnapshot(ByteString snapshot) {
168 @Override protected void onStateChanged() {
172 protected DataPersistenceProvider persistence() {
173 return this.dataPersistenceProvider;
176 @Override public String persistenceId() {
180 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
182 ByteArrayInputStream bis = null;
183 ObjectInputStream ois = null;
185 bis = new ByteArrayInputStream(bs.toByteArray());
186 ois = new ObjectInputStream(bis);
187 obj = ois.readObject();
199 public ReplicatedLog getReplicatedLog(){
200 return this.getRaftActorContext().getReplicatedLog();
206 private static class RaftActorTestKit extends JavaTestKit {
207 private final ActorRef raftActor;
209 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
212 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
213 Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
218 public boolean waitForStartup(){
219 // Wait for a specific log message to show up
221 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
224 protected Boolean run() {
227 }.from(raftActor.path().toString())
228 .message("Switching from behavior Candidate to Leader")
229 .occurrences(1).exec();
234 public void findLeader(final String expectedLeader){
235 raftActor.tell(new FindLeader(), getRef());
237 FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
238 assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
241 public ActorRef getRaftActor() {
248 public void testConstruction() {
249 boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
250 assertEquals(true, started);
254 public void testFindLeaderWhenLeaderIsSelf(){
255 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
256 kit.waitForStartup();
257 kit.findLeader(kit.getRaftActor().path().toString());
261 public void testRaftActorRecovery() throws Exception {
262 new JavaTestKit(getSystem()) {{
263 String persistenceId = "follower10";
265 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
266 // Set the heartbeat interval high to essentially disable election otherwise the test
267 // may fail if the actor is switched to Leader and the commitIndex is set to the last
269 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
271 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
272 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
274 watch(followerActor);
276 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
277 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
278 new MockRaftActorContext.MockPayload("E"));
279 snapshotUnappliedEntries.add(entry1);
281 int lastAppliedDuringSnapshotCapture = 3;
282 int lastIndexDuringSnapshotCapture = 4;
284 // 4 messages as part of snapshot, which are applied to state
285 ByteString snapshotBytes = fromObject(Arrays.asList(
286 new MockRaftActorContext.MockPayload("A"),
287 new MockRaftActorContext.MockPayload("B"),
288 new MockRaftActorContext.MockPayload("C"),
289 new MockRaftActorContext.MockPayload("D")));
291 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
292 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
293 lastAppliedDuringSnapshotCapture, 1);
294 MockSnapshotStore.setMockSnapshot(snapshot);
295 MockSnapshotStore.setPersistenceId(persistenceId);
297 // add more entries after snapshot is taken
298 List<ReplicatedLogEntry> entries = new ArrayList<>();
299 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
300 new MockRaftActorContext.MockPayload("F"));
301 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
302 new MockRaftActorContext.MockPayload("G"));
303 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
304 new MockRaftActorContext.MockPayload("H"));
309 int lastAppliedToState = 5;
312 MockAkkaJournal.addToJournal(5, entry2);
313 // 2 entries are applied to state besides the 4 entries in snapshot
314 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
315 MockAkkaJournal.addToJournal(7, entry3);
316 MockAkkaJournal.addToJournal(8, entry4);
319 followerActor.tell(PoisonPill.getInstance(), null);
320 expectMsgClass(duration("5 seconds"), Terminated.class);
322 unwatch(followerActor);
324 //reinstate the actor
325 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
326 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
327 Optional.<ConfigParams>of(config)));
329 ref.underlyingActor().waitForRecoveryComplete();
331 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
332 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
333 context.getReplicatedLog().size());
334 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
335 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
336 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
337 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
342 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
343 * process recovery messages
349 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
350 new JavaTestKit(getSystem()) {
352 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
354 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
356 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
358 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
359 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
361 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
363 // Wait for akka's recovery to complete so it doesn't interfere.
364 mockRaftActor.waitForRecoveryComplete();
366 ByteString snapshotBytes = fromObject(Arrays.asList(
367 new MockRaftActorContext.MockPayload("A"),
368 new MockRaftActorContext.MockPayload("B"),
369 new MockRaftActorContext.MockPayload("C"),
370 new MockRaftActorContext.MockPayload("D")));
372 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
373 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
375 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
377 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
379 assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
381 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
383 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
385 assertEquals("add replicated log entry", 1, replicatedLog.size());
387 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
389 assertEquals("add replicated log entry", 2, replicatedLog.size());
391 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
393 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
395 // The snapshot had 4 items + we added 2 more items during the test
396 // We start removing from 5 and we should get 1 item in the replicated log
397 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
399 assertEquals("remove log entries", 1, replicatedLog.size());
401 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
403 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
404 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
406 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
408 mockActorRef.tell(PoisonPill.getInstance(), getRef());
414 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
415 * not process recovery messages
420 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
421 new JavaTestKit(getSystem()) {
423 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
425 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
427 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
429 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
430 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
432 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
434 // Wait for akka's recovery to complete so it doesn't interfere.
435 mockRaftActor.waitForRecoveryComplete();
437 ByteString snapshotBytes = fromObject(Arrays.asList(
438 new MockRaftActorContext.MockPayload("A"),
439 new MockRaftActorContext.MockPayload("B"),
440 new MockRaftActorContext.MockPayload("C"),
441 new MockRaftActorContext.MockPayload("D")));
443 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
444 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
446 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
448 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
450 assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
452 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
454 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
456 assertEquals("add replicated log entry", 0, replicatedLog.size());
458 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
460 assertEquals("add replicated log entry", 0, replicatedLog.size());
462 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
464 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
466 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
468 assertEquals("remove log entries", 0, replicatedLog.size());
470 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
472 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
473 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
475 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
477 mockActorRef.tell(PoisonPill.getInstance(), getRef());
483 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
484 new JavaTestKit(getSystem()) {
486 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
488 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
490 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
492 CountDownLatch persistLatch = new CountDownLatch(1);
493 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
494 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
496 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
497 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
499 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
501 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
503 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
505 mockActorRef.tell(PoisonPill.getInstance(), getRef());
512 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
513 new JavaTestKit(getSystem()) {
515 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
517 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
519 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
521 CountDownLatch persistLatch = new CountDownLatch(1);
522 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
523 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
525 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
526 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
528 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
530 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
532 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
534 mockActorRef.tell(PoisonPill.getInstance(), getRef());
541 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
542 new JavaTestKit(getSystem()) {
544 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
546 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
548 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
550 CountDownLatch persistLatch = new CountDownLatch(2);
551 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
552 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
554 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
555 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
557 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
559 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
561 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
563 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
565 mockActorRef.tell(PoisonPill.getInstance(), getRef());
572 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
573 new JavaTestKit(getSystem()) {
575 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
577 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
579 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
581 CountDownLatch persistLatch = new CountDownLatch(1);
582 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
583 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
585 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
586 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
588 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
590 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
592 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
594 mockActorRef.tell(PoisonPill.getInstance(), getRef());
601 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
602 new JavaTestKit(getSystem()) {
604 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
606 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
608 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
610 CountDownLatch persistLatch = new CountDownLatch(1);
611 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
612 dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
614 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
615 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
617 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
619 ByteString snapshotBytes = fromObject(Arrays.asList(
620 new MockRaftActorContext.MockPayload("A"),
621 new MockRaftActorContext.MockPayload("B"),
622 new MockRaftActorContext.MockPayload("C"),
623 new MockRaftActorContext.MockPayload("D")));
625 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
627 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
629 assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
631 mockActorRef.tell(PoisonPill.getInstance(), getRef());
638 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
639 new JavaTestKit(getSystem()) {
641 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
643 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
645 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
647 CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
648 CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
649 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
650 dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
651 dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
653 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
654 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
656 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
658 ByteString snapshotBytes = fromObject(Arrays.asList(
659 new MockRaftActorContext.MockPayload("A"),
660 new MockRaftActorContext.MockPayload("B"),
661 new MockRaftActorContext.MockPayload("C"),
662 new MockRaftActorContext.MockPayload("D")));
664 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
666 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
668 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
670 assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
672 assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
674 mockActorRef.tell(PoisonPill.getInstance(), getRef());
680 private ByteString fromObject(Object snapshot) throws Exception {
681 ByteArrayOutputStream b = null;
682 ObjectOutputStream o = null;
684 b = new ByteArrayOutputStream();
685 o = new ObjectOutputStream(b);
686 o.writeObject(snapshot);
687 byte[] snapshotBytes = b.toByteArray();
688 return ByteString.copyFrom(snapshotBytes);