1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.event.Logging;
9 import akka.japi.Creator;
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SaveSnapshotSuccess;
12 import akka.persistence.SnapshotMetadata;
13 import akka.persistence.SnapshotOffer;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.Lists;
18 import com.google.protobuf.ByteString;
19 import org.junit.After;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.DataPersistenceProvider;
22 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
23 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
24 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
30 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
31 import scala.concurrent.duration.FiniteDuration;
33 import java.io.ByteArrayInputStream;
34 import java.io.ByteArrayOutputStream;
35 import java.io.IOException;
36 import java.io.ObjectInputStream;
37 import java.io.ObjectOutputStream;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotEquals;
48 import static org.mockito.Mockito.mock;
50 public class RaftActorTest extends AbstractActorTest {
54 public void tearDown() {
55 MockAkkaJournal.clearJournal();
56 MockSnapshotStore.setMockSnapshot(null);
59 public static class MockRaftActor extends RaftActor {
61 private final DataPersistenceProvider dataPersistenceProvider;
63 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
64 private final Map<String, String> peerAddresses;
65 private final String id;
66 private final Optional<ConfigParams> config;
67 private final DataPersistenceProvider dataPersistenceProvider;
69 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
70 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
71 this.peerAddresses = peerAddresses;
74 this.dataPersistenceProvider = dataPersistenceProvider;
78 public MockRaftActor create() throws Exception {
79 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
83 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
84 private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
85 private final CountDownLatch applyStateLatch = new CountDownLatch(1);
87 private final List<Object> state;
89 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
90 super(id, peerAddresses, config);
91 state = new ArrayList<>();
92 if(dataPersistenceProvider == null){
93 this.dataPersistenceProvider = new PersistentDataProvider();
95 this.dataPersistenceProvider = dataPersistenceProvider;
99 public void waitForRecoveryComplete() {
101 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
102 } catch (InterruptedException e) {
107 public CountDownLatch getApplyRecoverySnapshotLatch(){
108 return applyRecoverySnapshot;
111 public List<Object> getState() {
115 public static Props props(final String id, final Map<String, String> peerAddresses,
116 Optional<ConfigParams> config){
117 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
120 public static Props props(final String id, final Map<String, String> peerAddresses,
121 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
122 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
126 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
127 applyStateLatch.countDown();
131 protected void startLogRecoveryBatch(int maxBatchSize) {
135 protected void appendRecoveredLogEntry(Payload data) {
140 protected void applyCurrentLogRecoveryBatch() {
144 protected void onRecoveryComplete() {
145 recoveryComplete.countDown();
149 protected void applyRecoverySnapshot(ByteString snapshot) {
150 applyRecoverySnapshot.countDown();
152 Object data = toObject(snapshot);
153 System.out.println("!!!!!applyRecoverySnapshot: "+data);
154 if (data instanceof List) {
155 state.addAll((List) data);
157 } catch (Exception e) {
162 @Override protected void createSnapshot() {
165 @Override protected void applySnapshot(ByteString snapshot) {
168 @Override protected void onStateChanged() {
172 protected DataPersistenceProvider persistence() {
173 return this.dataPersistenceProvider;
176 @Override public String persistenceId() {
180 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
182 ByteArrayInputStream bis = null;
183 ObjectInputStream ois = null;
185 bis = new ByteArrayInputStream(bs.toByteArray());
186 ois = new ObjectInputStream(bis);
187 obj = ois.readObject();
199 public ReplicatedLog getReplicatedLog(){
200 return this.getRaftActorContext().getReplicatedLog();
206 private static class RaftActorTestKit extends JavaTestKit {
207 private final ActorRef raftActor;
209 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
212 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
213 Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
218 public boolean waitForStartup(){
219 // Wait for a specific log message to show up
221 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
224 protected Boolean run() {
227 }.from(raftActor.path().toString())
228 .message("Switching from behavior Candidate to Leader")
229 .occurrences(1).exec();
234 public void findLeader(final String expectedLeader){
235 raftActor.tell(new FindLeader(), getRef());
237 FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
238 assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
241 public ActorRef getRaftActor() {
248 public void testConstruction() {
249 boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
250 assertEquals(true, started);
254 public void testFindLeaderWhenLeaderIsSelf(){
255 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
256 kit.waitForStartup();
257 kit.findLeader(kit.getRaftActor().path().toString());
261 public void testRaftActorRecovery() throws Exception {
262 new JavaTestKit(getSystem()) {{
263 String persistenceId = "follower10";
265 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
266 // Set the heartbeat interval high to essentially disable election otherwise the test
267 // may fail if the actor is switched to Leader and the commitIndex is set to the last
269 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
271 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
272 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
274 watch(followerActor);
276 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
277 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
278 new MockRaftActorContext.MockPayload("E"));
279 snapshotUnappliedEntries.add(entry1);
281 int lastAppliedDuringSnapshotCapture = 3;
282 int lastIndexDuringSnapshotCapture = 4;
284 // 4 messages as part of snapshot, which are applied to state
285 ByteString snapshotBytes = fromObject(Arrays.asList(
286 new MockRaftActorContext.MockPayload("A"),
287 new MockRaftActorContext.MockPayload("B"),
288 new MockRaftActorContext.MockPayload("C"),
289 new MockRaftActorContext.MockPayload("D")));
291 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
292 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
293 lastAppliedDuringSnapshotCapture, 1);
294 MockSnapshotStore.setMockSnapshot(snapshot);
295 MockSnapshotStore.setPersistenceId(persistenceId);
297 // add more entries after snapshot is taken
298 List<ReplicatedLogEntry> entries = new ArrayList<>();
299 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
300 new MockRaftActorContext.MockPayload("F"));
301 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
302 new MockRaftActorContext.MockPayload("G"));
303 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
304 new MockRaftActorContext.MockPayload("H"));
309 int lastAppliedToState = 5;
312 MockAkkaJournal.addToJournal(5, entry2);
313 // 2 entries are applied to state besides the 4 entries in snapshot
314 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
315 MockAkkaJournal.addToJournal(7, entry3);
316 MockAkkaJournal.addToJournal(8, entry4);
319 followerActor.tell(PoisonPill.getInstance(), null);
320 expectMsgClass(duration("5 seconds"), Terminated.class);
322 unwatch(followerActor);
324 //reinstate the actor
325 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
326 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
327 Optional.<ConfigParams>of(config)));
329 ref.underlyingActor().waitForRecoveryComplete();
331 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
332 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
333 context.getReplicatedLog().size());
334 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
335 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
336 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
337 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
342 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
343 * process recovery messages
349 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
350 new JavaTestKit(getSystem()) {
352 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
354 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
356 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
358 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
359 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
361 ByteString snapshotBytes = fromObject(Arrays.asList(
362 new MockRaftActorContext.MockPayload("A"),
363 new MockRaftActorContext.MockPayload("B"),
364 new MockRaftActorContext.MockPayload("C"),
365 new MockRaftActorContext.MockPayload("D")));
367 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
368 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
370 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
372 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
374 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
376 assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
378 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
380 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
382 assertEquals("add replicated log entry", 1, replicatedLog.size());
384 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
386 assertEquals("add replicated log entry", 2, replicatedLog.size());
388 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
390 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
392 // The snapshot had 4 items + we added 2 more items during the test
393 // We start removing from 5 and we should get 1 item in the replicated log
394 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
396 assertEquals("remove log entries", 1, replicatedLog.size());
398 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
400 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
401 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
403 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
405 mockRaftActor.waitForRecoveryComplete();
407 mockActorRef.tell(PoisonPill.getInstance(), getRef());
413 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
414 * not process recovery messages
419 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
420 new JavaTestKit(getSystem()) {
422 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
424 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
426 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
428 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
429 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
431 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
433 ByteString snapshotBytes = fromObject(Arrays.asList(
434 new MockRaftActorContext.MockPayload("A"),
435 new MockRaftActorContext.MockPayload("B"),
436 new MockRaftActorContext.MockPayload("C"),
437 new MockRaftActorContext.MockPayload("D")));
439 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
440 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
442 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
444 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
446 assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
448 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
450 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
452 assertEquals("add replicated log entry", 0, replicatedLog.size());
454 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
456 assertEquals("add replicated log entry", 0, replicatedLog.size());
458 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
460 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
462 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
464 assertEquals("remove log entries", 0, replicatedLog.size());
466 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
468 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
469 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
471 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
473 mockRaftActor.waitForRecoveryComplete();
475 mockActorRef.tell(PoisonPill.getInstance(), getRef());
481 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
482 new JavaTestKit(getSystem()) {
484 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
486 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
488 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
490 CountDownLatch persistLatch = new CountDownLatch(1);
491 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
492 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
494 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
495 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
497 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
499 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
501 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
503 mockActorRef.tell(PoisonPill.getInstance(), getRef());
510 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
511 new JavaTestKit(getSystem()) {
513 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
515 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
517 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
519 CountDownLatch persistLatch = new CountDownLatch(1);
520 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
521 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
523 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
524 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
526 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
528 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
530 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
532 mockActorRef.tell(PoisonPill.getInstance(), getRef());
539 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
540 new JavaTestKit(getSystem()) {
542 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
544 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
546 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
548 CountDownLatch persistLatch = new CountDownLatch(2);
549 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
550 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
552 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
553 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
555 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
557 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
559 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
561 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
563 mockActorRef.tell(PoisonPill.getInstance(), getRef());
570 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
571 new JavaTestKit(getSystem()) {
573 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
575 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
577 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
579 CountDownLatch persistLatch = new CountDownLatch(1);
580 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
581 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
583 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
584 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
586 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
588 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
590 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
592 mockActorRef.tell(PoisonPill.getInstance(), getRef());
599 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
600 new JavaTestKit(getSystem()) {
602 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
604 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
606 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
608 CountDownLatch persistLatch = new CountDownLatch(1);
609 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
610 dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
612 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
613 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
615 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
617 ByteString snapshotBytes = fromObject(Arrays.asList(
618 new MockRaftActorContext.MockPayload("A"),
619 new MockRaftActorContext.MockPayload("B"),
620 new MockRaftActorContext.MockPayload("C"),
621 new MockRaftActorContext.MockPayload("D")));
623 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
625 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
627 assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
629 mockActorRef.tell(PoisonPill.getInstance(), getRef());
636 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
637 new JavaTestKit(getSystem()) {
639 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
641 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
643 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
645 CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
646 CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
647 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
648 dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
649 dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
651 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
652 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
654 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
656 ByteString snapshotBytes = fromObject(Arrays.asList(
657 new MockRaftActorContext.MockPayload("A"),
658 new MockRaftActorContext.MockPayload("B"),
659 new MockRaftActorContext.MockPayload("C"),
660 new MockRaftActorContext.MockPayload("D")));
662 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
664 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
666 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
668 assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
670 assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
672 mockActorRef.tell(PoisonPill.getInstance(), getRef());
678 private ByteString fromObject(Object snapshot) throws Exception {
679 ByteArrayOutputStream b = null;
680 ObjectOutputStream o = null;
682 b = new ByteArrayOutputStream();
683 o = new ObjectOutputStream(b);
684 o.writeObject(snapshot);
685 byte[] snapshotBytes = b.toByteArray();
686 return ByteString.copyFrom(snapshotBytes);