1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
35 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
36 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
37 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
38 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
44 import java.io.ByteArrayInputStream;
45 import java.io.ByteArrayOutputStream;
46 import java.io.IOException;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.util.ArrayList;
50 import java.util.Arrays;
51 import java.util.Collections;
52 import java.util.List;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.TimeoutException;
58 import static org.junit.Assert.assertEquals;
59 import static org.junit.Assert.assertNotEquals;
60 import static org.junit.Assert.assertNotNull;
61 import static org.junit.Assert.assertNull;
62 import static org.junit.Assert.assertTrue;
63 import static org.mockito.Matchers.any;
64 import static org.mockito.Matchers.anyObject;
65 import static org.mockito.Matchers.eq;
66 import static org.mockito.Mockito.doReturn;
67 import static org.mockito.Mockito.mock;
68 import static org.mockito.Mockito.times;
69 import static org.mockito.Mockito.verify;
71 public class RaftActorTest extends AbstractActorTest {
75 public void tearDown() {
76 MockAkkaJournal.clearJournal();
77 MockSnapshotStore.setMockSnapshot(null);
80 public static class MockRaftActor extends RaftActor {
82 private final DataPersistenceProvider dataPersistenceProvider;
83 private final RaftActor delegate;
85 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
86 private final Map<String, String> peerAddresses;
87 private final String id;
88 private final Optional<ConfigParams> config;
89 private final DataPersistenceProvider dataPersistenceProvider;
91 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
92 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
93 this.peerAddresses = peerAddresses;
96 this.dataPersistenceProvider = dataPersistenceProvider;
100 public MockRaftActor create() throws Exception {
101 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
105 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
107 private final List<Object> state;
109 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
110 super(id, peerAddresses, config);
111 state = new ArrayList<>();
112 this.delegate = mock(RaftActor.class);
113 if(dataPersistenceProvider == null){
114 this.dataPersistenceProvider = new PersistentDataProvider();
116 this.dataPersistenceProvider = dataPersistenceProvider;
120 public void waitForRecoveryComplete() {
122 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
123 } catch (InterruptedException e) {
128 public List<Object> getState() {
132 public static Props props(final String id, final Map<String, String> peerAddresses,
133 Optional<ConfigParams> config){
134 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
137 public static Props props(final String id, final Map<String, String> peerAddresses,
138 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
139 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
143 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
144 delegate.applyState(clientActor, identifier, data);
145 LOG.info("applyState called");
152 protected void startLogRecoveryBatch(int maxBatchSize) {
156 protected void appendRecoveredLogEntry(Payload data) {
161 protected void applyCurrentLogRecoveryBatch() {
165 protected void onRecoveryComplete() {
166 delegate.onRecoveryComplete();
167 recoveryComplete.countDown();
171 protected void applyRecoverySnapshot(ByteString snapshot) {
172 delegate.applyRecoverySnapshot(snapshot);
174 Object data = toObject(snapshot);
175 System.out.println("!!!!!applyRecoverySnapshot: "+data);
176 if (data instanceof List) {
177 state.addAll((List) data);
179 } catch (Exception e) {
184 @Override protected void createSnapshot() {
185 delegate.createSnapshot();
188 @Override protected void applySnapshot(ByteString snapshot) {
189 delegate.applySnapshot(snapshot);
192 @Override protected void onStateChanged() {
193 delegate.onStateChanged();
197 protected DataPersistenceProvider persistence() {
198 return this.dataPersistenceProvider;
201 @Override public String persistenceId() {
205 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
207 ByteArrayInputStream bis = null;
208 ObjectInputStream ois = null;
210 bis = new ByteArrayInputStream(bs.toByteArray());
211 ois = new ObjectInputStream(bis);
212 obj = ois.readObject();
224 public ReplicatedLog getReplicatedLog(){
225 return this.getRaftActorContext().getReplicatedLog();
231 private static class RaftActorTestKit extends JavaTestKit {
232 private final ActorRef raftActor;
234 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
237 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
238 Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
243 public ActorRef getRaftActor() {
247 public boolean waitForLogMessage(final Class logEventClass, String message){
248 // Wait for a specific log message to show up
250 new JavaTestKit.EventFilter<Boolean>(logEventClass
253 protected Boolean run() {
256 }.from(raftActor.path().toString())
258 .occurrences(1).exec();
263 protected void waitUntilLeader(){
264 waitUntilLeader(raftActor);
267 protected void waitUntilLeader(ActorRef actorRef) {
268 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
269 for(int i = 0; i < 20 * 5; i++) {
270 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
272 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
273 if(resp.getLeaderActor() != null) {
276 } catch(TimeoutException e) {
277 } catch(Exception e) {
278 System.err.println("FindLeader threw ex");
283 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
286 Assert.fail("Leader not found for actorRef " + actorRef.path());
293 public void testConstruction() {
294 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
298 public void testFindLeaderWhenLeaderIsSelf(){
299 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
300 kit.waitUntilLeader();
304 public void testRaftActorRecovery() throws Exception {
305 new JavaTestKit(getSystem()) {{
306 String persistenceId = "follower10";
308 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
309 // Set the heartbeat interval high to essentially disable election otherwise the test
310 // may fail if the actor is switched to Leader and the commitIndex is set to the last
312 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
314 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
315 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
317 watch(followerActor);
319 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
320 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
321 new MockRaftActorContext.MockPayload("E"));
322 snapshotUnappliedEntries.add(entry1);
324 int lastAppliedDuringSnapshotCapture = 3;
325 int lastIndexDuringSnapshotCapture = 4;
327 // 4 messages as part of snapshot, which are applied to state
328 ByteString snapshotBytes = fromObject(Arrays.asList(
329 new MockRaftActorContext.MockPayload("A"),
330 new MockRaftActorContext.MockPayload("B"),
331 new MockRaftActorContext.MockPayload("C"),
332 new MockRaftActorContext.MockPayload("D")));
334 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
335 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
336 lastAppliedDuringSnapshotCapture, 1);
337 MockSnapshotStore.setMockSnapshot(snapshot);
338 MockSnapshotStore.setPersistenceId(persistenceId);
340 // add more entries after snapshot is taken
341 List<ReplicatedLogEntry> entries = new ArrayList<>();
342 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
343 new MockRaftActorContext.MockPayload("F"));
344 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
345 new MockRaftActorContext.MockPayload("G"));
346 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
347 new MockRaftActorContext.MockPayload("H"));
352 int lastAppliedToState = 5;
355 MockAkkaJournal.addToJournal(5, entry2);
356 // 2 entries are applied to state besides the 4 entries in snapshot
357 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
358 MockAkkaJournal.addToJournal(7, entry3);
359 MockAkkaJournal.addToJournal(8, entry4);
362 followerActor.tell(PoisonPill.getInstance(), null);
363 expectMsgClass(duration("5 seconds"), Terminated.class);
365 unwatch(followerActor);
367 //reinstate the actor
368 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
369 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
370 Optional.<ConfigParams>of(config)));
372 ref.underlyingActor().waitForRecoveryComplete();
374 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
375 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
376 context.getReplicatedLog().size());
377 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
378 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
379 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
380 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
385 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
386 * process recovery messages
392 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
393 new JavaTestKit(getSystem()) {
395 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
397 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
399 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
401 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
402 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
404 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
406 // Wait for akka's recovery to complete so it doesn't interfere.
407 mockRaftActor.waitForRecoveryComplete();
409 ByteString snapshotBytes = fromObject(Arrays.asList(
410 new MockRaftActorContext.MockPayload("A"),
411 new MockRaftActorContext.MockPayload("B"),
412 new MockRaftActorContext.MockPayload("C"),
413 new MockRaftActorContext.MockPayload("D")));
415 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
416 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
418 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
420 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
422 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
424 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
426 assertEquals("add replicated log entry", 1, replicatedLog.size());
428 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
430 assertEquals("add replicated log entry", 2, replicatedLog.size());
432 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
434 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
436 // The snapshot had 4 items + we added 2 more items during the test
437 // We start removing from 5 and we should get 1 item in the replicated log
438 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
440 assertEquals("remove log entries", 1, replicatedLog.size());
442 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
444 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
445 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
447 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
449 mockActorRef.tell(PoisonPill.getInstance(), getRef());
455 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
456 * not process recovery messages
461 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
462 new JavaTestKit(getSystem()) {
464 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
466 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
468 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
470 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
471 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
473 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
475 // Wait for akka's recovery to complete so it doesn't interfere.
476 mockRaftActor.waitForRecoveryComplete();
478 ByteString snapshotBytes = fromObject(Arrays.asList(
479 new MockRaftActorContext.MockPayload("A"),
480 new MockRaftActorContext.MockPayload("B"),
481 new MockRaftActorContext.MockPayload("C"),
482 new MockRaftActorContext.MockPayload("D")));
484 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
485 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
487 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
489 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
491 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
493 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
495 assertEquals("add replicated log entry", 0, replicatedLog.size());
497 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
499 assertEquals("add replicated log entry", 0, replicatedLog.size());
501 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
503 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
505 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
507 assertEquals("remove log entries", 0, replicatedLog.size());
509 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
511 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
512 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
514 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
516 mockActorRef.tell(PoisonPill.getInstance(), getRef());
522 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
523 new JavaTestKit(getSystem()) {
525 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
527 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
529 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
531 CountDownLatch persistLatch = new CountDownLatch(1);
532 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
533 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
535 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
536 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
538 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
540 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
542 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
544 mockActorRef.tell(PoisonPill.getInstance(), getRef());
551 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
552 new JavaTestKit(getSystem()) {
554 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
556 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
558 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
561 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
563 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
564 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
566 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
568 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
570 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
572 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
574 mockActorRef.tell(PoisonPill.getInstance(), getRef());
581 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
582 new JavaTestKit(getSystem()) {
584 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
586 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
588 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
590 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
592 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
593 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
595 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
597 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
599 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
601 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
604 mockActorRef.tell(PoisonPill.getInstance(), getRef());
611 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
612 new JavaTestKit(getSystem()) {
614 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
616 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
618 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
620 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
622 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
623 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
625 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
627 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
629 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
631 mockActorRef.tell(PoisonPill.getInstance(), getRef());
638 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
639 new JavaTestKit(getSystem()) {
641 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
643 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
645 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
647 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
649 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
650 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
652 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
654 ByteString snapshotBytes = fromObject(Arrays.asList(
655 new MockRaftActorContext.MockPayload("A"),
656 new MockRaftActorContext.MockPayload("B"),
657 new MockRaftActorContext.MockPayload("C"),
658 new MockRaftActorContext.MockPayload("D")));
660 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
662 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
664 verify(dataPersistenceProvider).saveSnapshot(anyObject());
666 mockActorRef.tell(PoisonPill.getInstance(), getRef());
673 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
674 new JavaTestKit(getSystem()) {
676 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
678 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
680 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
682 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
684 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
685 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
687 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
689 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
690 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
691 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
692 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
693 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
695 ByteString snapshotBytes = fromObject(Arrays.asList(
696 new MockRaftActorContext.MockPayload("A"),
697 new MockRaftActorContext.MockPayload("B"),
698 new MockRaftActorContext.MockPayload("C"),
699 new MockRaftActorContext.MockPayload("D")));
701 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
703 verify(mockRaftActor.delegate).createSnapshot();
705 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
707 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
709 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
711 verify(dataPersistenceProvider).deleteMessages(100);
713 assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot());
715 assertEquals(2, mockRaftActor.getReplicatedLog().size());
717 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
718 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
720 // Index 2 will not be in the log because it was removed due to snapshotting
721 assertNull(mockRaftActor.getReplicatedLog().get(2));
723 mockActorRef.tell(PoisonPill.getInstance(), getRef());
730 public void testApplyState() throws Exception {
732 new JavaTestKit(getSystem()) {
734 String persistenceId = "testApplyState";
736 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
738 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
740 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
742 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
743 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
745 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
747 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
748 new MockRaftActorContext.MockPayload("F"));
750 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
752 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
754 mockActorRef.tell(PoisonPill.getInstance(), getRef());
763 public void testApplySnapshot() throws Exception {
764 new JavaTestKit(getSystem()) {
766 String persistenceId = "testApplySnapshot";
768 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
770 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
772 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
774 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
775 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
777 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
779 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
781 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
782 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
783 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class)));
785 ByteString snapshotBytes = fromObject(Arrays.asList(
786 new MockRaftActorContext.MockPayload("A"),
787 new MockRaftActorContext.MockPayload("B"),
788 new MockRaftActorContext.MockPayload("C"),
789 new MockRaftActorContext.MockPayload("D")));
791 Snapshot snapshot = mock(Snapshot.class);
793 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
795 doReturn(3L).when(snapshot).getLastAppliedIndex();
797 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
799 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
801 assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog());
803 assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied());
805 assertEquals(0, mockRaftActor.getReplicatedLog().size());
807 mockActorRef.tell(PoisonPill.getInstance(), getRef());
814 public void testSaveSnapshotFailure() throws Exception {
815 new JavaTestKit(getSystem()) {
817 String persistenceId = "testSaveSnapshotFailure";
819 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
821 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
823 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
825 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
826 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
828 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
830 ByteString snapshotBytes = fromObject(Arrays.asList(
831 new MockRaftActorContext.MockPayload("A"),
832 new MockRaftActorContext.MockPayload("B"),
833 new MockRaftActorContext.MockPayload("C"),
834 new MockRaftActorContext.MockPayload("D")));
836 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
838 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
840 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
843 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
844 mockRaftActor.getReplicatedLog().getSnapshotIndex());
846 assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot());
848 mockActorRef.tell(PoisonPill.getInstance(), getRef());
854 private ByteString fromObject(Object snapshot) throws Exception {
855 ByteArrayOutputStream b = null;
856 ObjectOutputStream o = null;
858 b = new ByteArrayOutputStream();
859 o = new ObjectOutputStream(b);
860 o.writeObject(snapshot);
861 byte[] snapshotBytes = b.toByteArray();
862 return ByteString.copyFrom(snapshotBytes);