1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
35 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
36 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
37 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
38 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
39 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
40 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
46 import java.io.ByteArrayInputStream;
47 import java.io.ByteArrayOutputStream;
48 import java.io.IOException;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.Collections;
54 import java.util.List;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
60 import static org.junit.Assert.assertEquals;
61 import static org.junit.Assert.assertNotEquals;
62 import static org.junit.Assert.assertNotNull;
63 import static org.junit.Assert.assertNull;
64 import static org.junit.Assert.assertTrue;
65 import static org.mockito.Matchers.any;
66 import static org.mockito.Matchers.anyObject;
67 import static org.mockito.Matchers.eq;
68 import static org.mockito.Mockito.doReturn;
69 import static org.mockito.Mockito.mock;
70 import static org.mockito.Mockito.times;
71 import static org.mockito.Mockito.verify;
73 public class RaftActorTest extends AbstractActorTest {
77 public void tearDown() {
78 MockAkkaJournal.clearJournal();
79 MockSnapshotStore.setMockSnapshot(null);
82 public static class MockRaftActor extends RaftActor {
84 private final DataPersistenceProvider dataPersistenceProvider;
85 private final RaftActor delegate;
87 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
88 private static final long serialVersionUID = 1L;
89 private final Map<String, String> peerAddresses;
90 private final String id;
91 private final Optional<ConfigParams> config;
92 private final DataPersistenceProvider dataPersistenceProvider;
94 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
95 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
96 this.peerAddresses = peerAddresses;
99 this.dataPersistenceProvider = dataPersistenceProvider;
103 public MockRaftActor create() throws Exception {
104 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
108 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
110 private final List<Object> state;
112 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
113 super(id, peerAddresses, config);
114 state = new ArrayList<>();
115 this.delegate = mock(RaftActor.class);
116 if(dataPersistenceProvider == null){
117 this.dataPersistenceProvider = new PersistentDataProvider();
119 this.dataPersistenceProvider = dataPersistenceProvider;
123 public void waitForRecoveryComplete() {
125 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
126 } catch (InterruptedException e) {
131 public List<Object> getState() {
135 public static Props props(final String id, final Map<String, String> peerAddresses,
136 Optional<ConfigParams> config){
137 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
140 public static Props props(final String id, final Map<String, String> peerAddresses,
141 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
142 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
146 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
147 delegate.applyState(clientActor, identifier, data);
148 LOG.info("applyState called");
155 protected void startLogRecoveryBatch(int maxBatchSize) {
159 protected void appendRecoveredLogEntry(Payload data) {
164 protected void applyCurrentLogRecoveryBatch() {
168 protected void onRecoveryComplete() {
169 delegate.onRecoveryComplete();
170 recoveryComplete.countDown();
174 protected void applyRecoverySnapshot(ByteString snapshot) {
175 delegate.applyRecoverySnapshot(snapshot);
177 Object data = toObject(snapshot);
178 System.out.println("!!!!!applyRecoverySnapshot: "+data);
179 if (data instanceof List) {
180 state.addAll((List<?>) data);
182 } catch (Exception e) {
187 @Override protected void createSnapshot() {
188 delegate.createSnapshot();
191 @Override protected void applySnapshot(ByteString snapshot) {
192 delegate.applySnapshot(snapshot);
195 @Override protected void onStateChanged() {
196 delegate.onStateChanged();
200 protected DataPersistenceProvider persistence() {
201 return this.dataPersistenceProvider;
204 @Override public String persistenceId() {
208 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
210 ByteArrayInputStream bis = null;
211 ObjectInputStream ois = null;
213 bis = new ByteArrayInputStream(bs.toByteArray());
214 ois = new ObjectInputStream(bis);
215 obj = ois.readObject();
227 public ReplicatedLog getReplicatedLog(){
228 return this.getRaftActorContext().getReplicatedLog();
234 private static class RaftActorTestKit extends JavaTestKit {
235 private final ActorRef raftActor;
237 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
240 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
241 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
246 public ActorRef getRaftActor() {
250 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
251 // Wait for a specific log message to show up
253 new JavaTestKit.EventFilter<Boolean>(logEventClass
256 protected Boolean run() {
259 }.from(raftActor.path().toString())
261 .occurrences(1).exec();
266 protected void waitUntilLeader(){
267 waitUntilLeader(raftActor);
270 protected void waitUntilLeader(ActorRef actorRef) {
271 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
272 for(int i = 0; i < 20 * 5; i++) {
273 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
275 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
276 if(resp.getLeaderActor() != null) {
279 } catch(TimeoutException e) {
280 } catch(Exception e) {
281 System.err.println("FindLeader threw ex");
286 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
289 Assert.fail("Leader not found for actorRef " + actorRef.path());
296 public void testConstruction() {
297 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
301 public void testFindLeaderWhenLeaderIsSelf(){
302 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
303 kit.waitUntilLeader();
307 public void testRaftActorRecovery() throws Exception {
308 new JavaTestKit(getSystem()) {{
309 String persistenceId = "follower10";
311 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
312 // Set the heartbeat interval high to essentially disable election otherwise the test
313 // may fail if the actor is switched to Leader and the commitIndex is set to the last
315 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
317 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
318 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
320 watch(followerActor);
322 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
323 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
324 new MockRaftActorContext.MockPayload("E"));
325 snapshotUnappliedEntries.add(entry1);
327 int lastAppliedDuringSnapshotCapture = 3;
328 int lastIndexDuringSnapshotCapture = 4;
330 // 4 messages as part of snapshot, which are applied to state
331 ByteString snapshotBytes = fromObject(Arrays.asList(
332 new MockRaftActorContext.MockPayload("A"),
333 new MockRaftActorContext.MockPayload("B"),
334 new MockRaftActorContext.MockPayload("C"),
335 new MockRaftActorContext.MockPayload("D")));
337 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
338 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
339 lastAppliedDuringSnapshotCapture, 1);
340 MockSnapshotStore.setMockSnapshot(snapshot);
341 MockSnapshotStore.setPersistenceId(persistenceId);
343 // add more entries after snapshot is taken
344 List<ReplicatedLogEntry> entries = new ArrayList<>();
345 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
346 new MockRaftActorContext.MockPayload("F"));
347 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
348 new MockRaftActorContext.MockPayload("G"));
349 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
350 new MockRaftActorContext.MockPayload("H"));
355 int lastAppliedToState = 5;
358 MockAkkaJournal.addToJournal(5, entry2);
359 // 2 entries are applied to state besides the 4 entries in snapshot
360 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
361 MockAkkaJournal.addToJournal(7, entry3);
362 MockAkkaJournal.addToJournal(8, entry4);
365 followerActor.tell(PoisonPill.getInstance(), null);
366 expectMsgClass(duration("5 seconds"), Terminated.class);
368 unwatch(followerActor);
370 //reinstate the actor
371 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
372 MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
373 Optional.<ConfigParams>of(config)));
375 ref.underlyingActor().waitForRecoveryComplete();
377 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
378 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
379 context.getReplicatedLog().size());
380 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
381 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
382 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
383 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
388 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
389 * process recovery messages
395 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
396 new JavaTestKit(getSystem()) {
398 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
400 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
402 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
404 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
405 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
407 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
409 // Wait for akka's recovery to complete so it doesn't interfere.
410 mockRaftActor.waitForRecoveryComplete();
412 ByteString snapshotBytes = fromObject(Arrays.asList(
413 new MockRaftActorContext.MockPayload("A"),
414 new MockRaftActorContext.MockPayload("B"),
415 new MockRaftActorContext.MockPayload("C"),
416 new MockRaftActorContext.MockPayload("D")));
418 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
419 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
421 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
423 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
425 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
427 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
429 assertEquals("add replicated log entry", 1, replicatedLog.size());
431 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
433 assertEquals("add replicated log entry", 2, replicatedLog.size());
435 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
437 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
439 // The snapshot had 4 items + we added 2 more items during the test
440 // We start removing from 5 and we should get 1 item in the replicated log
441 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
443 assertEquals("remove log entries", 1, replicatedLog.size());
445 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
447 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
448 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
450 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
452 mockActorRef.tell(PoisonPill.getInstance(), getRef());
458 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
459 * not process recovery messages
464 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
465 new JavaTestKit(getSystem()) {
467 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
469 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
471 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
473 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
474 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
476 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
478 // Wait for akka's recovery to complete so it doesn't interfere.
479 mockRaftActor.waitForRecoveryComplete();
481 ByteString snapshotBytes = fromObject(Arrays.asList(
482 new MockRaftActorContext.MockPayload("A"),
483 new MockRaftActorContext.MockPayload("B"),
484 new MockRaftActorContext.MockPayload("C"),
485 new MockRaftActorContext.MockPayload("D")));
487 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
488 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
490 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
492 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
494 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
496 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
498 assertEquals("add replicated log entry", 0, replicatedLog.size());
500 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
502 assertEquals("add replicated log entry", 0, replicatedLog.size());
504 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
506 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
508 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
510 assertEquals("remove log entries", 0, replicatedLog.size());
512 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
514 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
515 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
517 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
519 mockActorRef.tell(PoisonPill.getInstance(), getRef());
525 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
526 new JavaTestKit(getSystem()) {
528 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
530 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
532 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
534 CountDownLatch persistLatch = new CountDownLatch(1);
535 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
536 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
538 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
539 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
541 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
543 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
545 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
547 mockActorRef.tell(PoisonPill.getInstance(), getRef());
554 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
555 new JavaTestKit(getSystem()) {
557 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
559 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
561 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
563 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
565 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
566 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
568 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
570 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
572 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
574 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
576 mockActorRef.tell(PoisonPill.getInstance(), getRef());
583 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
584 new JavaTestKit(getSystem()) {
586 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
588 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
590 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
592 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
594 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
595 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
597 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
599 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
601 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
603 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
605 mockActorRef.tell(PoisonPill.getInstance(), getRef());
612 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
613 new JavaTestKit(getSystem()) {
615 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
617 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
619 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
621 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
623 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
624 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
626 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
628 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
630 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
632 mockActorRef.tell(PoisonPill.getInstance(), getRef());
639 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
640 new JavaTestKit(getSystem()) {
642 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
644 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
646 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
648 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
650 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
651 MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
652 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
654 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
656 ByteString snapshotBytes = fromObject(Arrays.asList(
657 new MockRaftActorContext.MockPayload("A"),
658 new MockRaftActorContext.MockPayload("B"),
659 new MockRaftActorContext.MockPayload("C"),
660 new MockRaftActorContext.MockPayload("D")));
662 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
664 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
666 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
668 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
670 verify(dataPersistenceProvider).saveSnapshot(anyObject());
672 mockActorRef.tell(PoisonPill.getInstance(), getRef());
679 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
680 new JavaTestKit(getSystem()) {
682 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
684 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
686 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
688 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
690 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
691 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
693 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
695 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
696 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
697 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
698 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
699 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
701 ByteString snapshotBytes = fromObject(Arrays.asList(
702 new MockRaftActorContext.MockPayload("A"),
703 new MockRaftActorContext.MockPayload("B"),
704 new MockRaftActorContext.MockPayload("C"),
705 new MockRaftActorContext.MockPayload("D")));
707 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
708 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
710 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
712 verify(mockRaftActor.delegate).createSnapshot();
714 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
716 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
718 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
720 verify(dataPersistenceProvider).deleteMessages(100);
722 assertEquals(2, mockRaftActor.getReplicatedLog().size());
724 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
725 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
727 // Index 2 will not be in the log because it was removed due to snapshotting
728 assertNull(mockRaftActor.getReplicatedLog().get(2));
730 mockActorRef.tell(PoisonPill.getInstance(), getRef());
737 public void testApplyState() throws Exception {
739 new JavaTestKit(getSystem()) {
741 String persistenceId = "testApplyState";
743 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
745 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
747 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
749 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
750 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
752 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
754 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
755 new MockRaftActorContext.MockPayload("F"));
757 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
759 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
761 mockActorRef.tell(PoisonPill.getInstance(), getRef());
768 public void testApplySnapshot() throws Exception {
769 new JavaTestKit(getSystem()) {
771 String persistenceId = "testApplySnapshot";
773 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
775 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
777 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
779 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
780 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
782 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
784 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
786 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
787 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
788 oldReplicatedLog.append(
789 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
790 mock(Payload.class)));
792 ByteString snapshotBytes = fromObject(Arrays.asList(
793 new MockRaftActorContext.MockPayload("A"),
794 new MockRaftActorContext.MockPayload("B"),
795 new MockRaftActorContext.MockPayload("C"),
796 new MockRaftActorContext.MockPayload("D")));
798 Snapshot snapshot = mock(Snapshot.class);
800 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
802 doReturn(3L).when(snapshot).getLastAppliedIndex();
804 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
806 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
808 assertTrue("The replicatedLog should have changed",
809 oldReplicatedLog != mockRaftActor.getReplicatedLog());
811 assertEquals("lastApplied should be same as in the snapshot",
812 (Long) 3L, mockRaftActor.getLastApplied());
814 assertEquals(0, mockRaftActor.getReplicatedLog().size());
816 mockActorRef.tell(PoisonPill.getInstance(), getRef());
823 public void testSaveSnapshotFailure() throws Exception {
824 new JavaTestKit(getSystem()) {
826 String persistenceId = "testSaveSnapshotFailure";
828 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
830 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
832 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
834 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
835 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
837 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
839 ByteString snapshotBytes = fromObject(Arrays.asList(
840 new MockRaftActorContext.MockPayload("A"),
841 new MockRaftActorContext.MockPayload("B"),
842 new MockRaftActorContext.MockPayload("C"),
843 new MockRaftActorContext.MockPayload("D")));
845 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
847 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
849 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
851 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
853 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
856 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
857 mockRaftActor.getReplicatedLog().getSnapshotIndex());
859 mockActorRef.tell(PoisonPill.getInstance(), getRef());
865 private ByteString fromObject(Object snapshot) throws Exception {
866 ByteArrayOutputStream b = null;
867 ObjectOutputStream o = null;
869 b = new ByteArrayOutputStream();
870 o = new ObjectOutputStream(b);
871 o.writeObject(snapshot);
872 byte[] snapshotBytes = b.toByteArray();
873 return ByteString.copyFrom(snapshotBytes);