1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
35 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
36 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
37 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
38 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
39 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
40 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
46 import java.io.ByteArrayInputStream;
47 import java.io.ByteArrayOutputStream;
48 import java.io.IOException;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.Collections;
54 import java.util.List;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
60 import static org.junit.Assert.assertEquals;
61 import static org.junit.Assert.assertNotEquals;
62 import static org.junit.Assert.assertNotNull;
63 import static org.junit.Assert.assertNull;
64 import static org.junit.Assert.assertTrue;
65 import static org.mockito.Matchers.any;
66 import static org.mockito.Matchers.anyObject;
67 import static org.mockito.Matchers.eq;
68 import static org.mockito.Mockito.doReturn;
69 import static org.mockito.Mockito.mock;
70 import static org.mockito.Mockito.times;
71 import static org.mockito.Mockito.verify;
73 public class RaftActorTest extends AbstractActorTest {
77 public void tearDown() {
78 MockAkkaJournal.clearJournal();
79 MockSnapshotStore.setMockSnapshot(null);
82 public static class MockRaftActor extends RaftActor {
84 private final DataPersistenceProvider dataPersistenceProvider;
85 private final RaftActor delegate;
87 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
88 private final Map<String, String> peerAddresses;
89 private final String id;
90 private final Optional<ConfigParams> config;
91 private final DataPersistenceProvider dataPersistenceProvider;
93 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
94 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
95 this.peerAddresses = peerAddresses;
98 this.dataPersistenceProvider = dataPersistenceProvider;
102 public MockRaftActor create() throws Exception {
103 return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider);
107 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
109 private final List<Object> state;
111 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
112 super(id, peerAddresses, config);
113 state = new ArrayList<>();
114 this.delegate = mock(RaftActor.class);
115 if(dataPersistenceProvider == null){
116 this.dataPersistenceProvider = new PersistentDataProvider();
118 this.dataPersistenceProvider = dataPersistenceProvider;
122 public void waitForRecoveryComplete() {
124 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
125 } catch (InterruptedException e) {
130 public List<Object> getState() {
134 public static Props props(final String id, final Map<String, String> peerAddresses,
135 Optional<ConfigParams> config){
136 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null));
139 public static Props props(final String id, final Map<String, String> peerAddresses,
140 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
141 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider));
145 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
146 delegate.applyState(clientActor, identifier, data);
147 LOG.info("applyState called");
154 protected void startLogRecoveryBatch(int maxBatchSize) {
158 protected void appendRecoveredLogEntry(Payload data) {
163 protected void applyCurrentLogRecoveryBatch() {
167 protected void onRecoveryComplete() {
168 delegate.onRecoveryComplete();
169 recoveryComplete.countDown();
173 protected void applyRecoverySnapshot(ByteString snapshot) {
174 delegate.applyRecoverySnapshot(snapshot);
176 Object data = toObject(snapshot);
177 System.out.println("!!!!!applyRecoverySnapshot: "+data);
178 if (data instanceof List) {
179 state.addAll((List<?>) data);
181 } catch (Exception e) {
186 @Override protected void createSnapshot() {
187 delegate.createSnapshot();
190 @Override protected void applySnapshot(ByteString snapshot) {
191 delegate.applySnapshot(snapshot);
194 @Override protected void onStateChanged() {
195 delegate.onStateChanged();
199 protected DataPersistenceProvider persistence() {
200 return this.dataPersistenceProvider;
203 @Override public String persistenceId() {
207 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
209 ByteArrayInputStream bis = null;
210 ObjectInputStream ois = null;
212 bis = new ByteArrayInputStream(bs.toByteArray());
213 ois = new ObjectInputStream(bis);
214 obj = ois.readObject();
226 public ReplicatedLog getReplicatedLog(){
227 return this.getRaftActorContext().getReplicatedLog();
233 private static class RaftActorTestKit extends JavaTestKit {
234 private final ActorRef raftActor;
236 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
239 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
240 Collections.EMPTY_MAP, Optional.<ConfigParams>absent()), actorName);
245 public ActorRef getRaftActor() {
249 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
250 // Wait for a specific log message to show up
252 new JavaTestKit.EventFilter<Boolean>(logEventClass
255 protected Boolean run() {
258 }.from(raftActor.path().toString())
260 .occurrences(1).exec();
265 protected void waitUntilLeader(){
266 waitUntilLeader(raftActor);
269 protected void waitUntilLeader(ActorRef actorRef) {
270 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
271 for(int i = 0; i < 20 * 5; i++) {
272 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
274 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
275 if(resp.getLeaderActor() != null) {
278 } catch(TimeoutException e) {
279 } catch(Exception e) {
280 System.err.println("FindLeader threw ex");
285 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
288 Assert.fail("Leader not found for actorRef " + actorRef.path());
295 public void testConstruction() {
296 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
300 public void testFindLeaderWhenLeaderIsSelf(){
301 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
302 kit.waitUntilLeader();
306 public void testRaftActorRecovery() throws Exception {
307 new JavaTestKit(getSystem()) {{
308 String persistenceId = "follower10";
310 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
311 // Set the heartbeat interval high to essentially disable election otherwise the test
312 // may fail if the actor is switched to Leader and the commitIndex is set to the last
314 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
316 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
317 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
319 watch(followerActor);
321 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
322 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
323 new MockRaftActorContext.MockPayload("E"));
324 snapshotUnappliedEntries.add(entry1);
326 int lastAppliedDuringSnapshotCapture = 3;
327 int lastIndexDuringSnapshotCapture = 4;
329 // 4 messages as part of snapshot, which are applied to state
330 ByteString snapshotBytes = fromObject(Arrays.asList(
331 new MockRaftActorContext.MockPayload("A"),
332 new MockRaftActorContext.MockPayload("B"),
333 new MockRaftActorContext.MockPayload("C"),
334 new MockRaftActorContext.MockPayload("D")));
336 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
337 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
338 lastAppliedDuringSnapshotCapture, 1);
339 MockSnapshotStore.setMockSnapshot(snapshot);
340 MockSnapshotStore.setPersistenceId(persistenceId);
342 // add more entries after snapshot is taken
343 List<ReplicatedLogEntry> entries = new ArrayList<>();
344 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
345 new MockRaftActorContext.MockPayload("F"));
346 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
347 new MockRaftActorContext.MockPayload("G"));
348 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
349 new MockRaftActorContext.MockPayload("H"));
354 int lastAppliedToState = 5;
357 MockAkkaJournal.addToJournal(5, entry2);
358 // 2 entries are applied to state besides the 4 entries in snapshot
359 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
360 MockAkkaJournal.addToJournal(7, entry3);
361 MockAkkaJournal.addToJournal(8, entry4);
364 followerActor.tell(PoisonPill.getInstance(), null);
365 expectMsgClass(duration("5 seconds"), Terminated.class);
367 unwatch(followerActor);
369 //reinstate the actor
370 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
371 MockRaftActor.props(persistenceId, Collections.EMPTY_MAP,
372 Optional.<ConfigParams>of(config)));
374 ref.underlyingActor().waitForRecoveryComplete();
376 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
377 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
378 context.getReplicatedLog().size());
379 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
380 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
381 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
382 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
387 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
388 * process recovery messages
394 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
395 new JavaTestKit(getSystem()) {
397 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
399 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
401 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
403 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
404 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
406 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
408 // Wait for akka's recovery to complete so it doesn't interfere.
409 mockRaftActor.waitForRecoveryComplete();
411 ByteString snapshotBytes = fromObject(Arrays.asList(
412 new MockRaftActorContext.MockPayload("A"),
413 new MockRaftActorContext.MockPayload("B"),
414 new MockRaftActorContext.MockPayload("C"),
415 new MockRaftActorContext.MockPayload("D")));
417 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
418 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
420 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
422 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
424 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
426 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
428 assertEquals("add replicated log entry", 1, replicatedLog.size());
430 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
432 assertEquals("add replicated log entry", 2, replicatedLog.size());
434 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
436 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
438 // The snapshot had 4 items + we added 2 more items during the test
439 // We start removing from 5 and we should get 1 item in the replicated log
440 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
442 assertEquals("remove log entries", 1, replicatedLog.size());
444 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
446 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
447 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
449 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
451 mockActorRef.tell(PoisonPill.getInstance(), getRef());
457 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
458 * not process recovery messages
463 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
464 new JavaTestKit(getSystem()) {
466 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
468 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
470 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
472 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
473 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
475 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
477 // Wait for akka's recovery to complete so it doesn't interfere.
478 mockRaftActor.waitForRecoveryComplete();
480 ByteString snapshotBytes = fromObject(Arrays.asList(
481 new MockRaftActorContext.MockPayload("A"),
482 new MockRaftActorContext.MockPayload("B"),
483 new MockRaftActorContext.MockPayload("C"),
484 new MockRaftActorContext.MockPayload("D")));
486 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
487 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
489 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
491 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
493 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
495 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
497 assertEquals("add replicated log entry", 0, replicatedLog.size());
499 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
501 assertEquals("add replicated log entry", 0, replicatedLog.size());
503 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
505 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
507 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
509 assertEquals("remove log entries", 0, replicatedLog.size());
511 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
513 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
514 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
516 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
518 mockActorRef.tell(PoisonPill.getInstance(), getRef());
524 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
525 new JavaTestKit(getSystem()) {
527 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
529 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
531 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
533 CountDownLatch persistLatch = new CountDownLatch(1);
534 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
535 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
537 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
538 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
540 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
542 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
544 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
546 mockActorRef.tell(PoisonPill.getInstance(), getRef());
553 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
554 new JavaTestKit(getSystem()) {
556 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
558 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
560 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
562 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
564 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
565 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
567 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
569 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
571 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
573 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
575 mockActorRef.tell(PoisonPill.getInstance(), getRef());
582 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
583 new JavaTestKit(getSystem()) {
585 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
587 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
589 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
591 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
593 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
594 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
596 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
598 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
600 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
602 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
604 mockActorRef.tell(PoisonPill.getInstance(), getRef());
611 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
612 new JavaTestKit(getSystem()) {
614 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
616 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
618 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
620 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
622 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
623 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
625 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
627 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
629 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
631 mockActorRef.tell(PoisonPill.getInstance(), getRef());
638 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
639 new JavaTestKit(getSystem()) {
641 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
643 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
645 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
647 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
649 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
650 MockRaftActor.props(persistenceId,Collections.EMPTY_MAP,
651 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
653 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
655 ByteString snapshotBytes = fromObject(Arrays.asList(
656 new MockRaftActorContext.MockPayload("A"),
657 new MockRaftActorContext.MockPayload("B"),
658 new MockRaftActorContext.MockPayload("C"),
659 new MockRaftActorContext.MockPayload("D")));
661 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
663 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
665 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
667 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
669 verify(dataPersistenceProvider).saveSnapshot(anyObject());
671 mockActorRef.tell(PoisonPill.getInstance(), getRef());
678 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
679 new JavaTestKit(getSystem()) {
681 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
683 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
685 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
687 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
689 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
690 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
692 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
694 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
695 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
696 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
697 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
698 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
700 ByteString snapshotBytes = fromObject(Arrays.asList(
701 new MockRaftActorContext.MockPayload("A"),
702 new MockRaftActorContext.MockPayload("B"),
703 new MockRaftActorContext.MockPayload("C"),
704 new MockRaftActorContext.MockPayload("D")));
706 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
707 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
709 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
711 verify(mockRaftActor.delegate).createSnapshot();
713 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
715 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
717 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
719 verify(dataPersistenceProvider).deleteMessages(100);
721 assertEquals(2, mockRaftActor.getReplicatedLog().size());
723 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
724 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
726 // Index 2 will not be in the log because it was removed due to snapshotting
727 assertNull(mockRaftActor.getReplicatedLog().get(2));
729 mockActorRef.tell(PoisonPill.getInstance(), getRef());
736 public void testApplyState() throws Exception {
738 new JavaTestKit(getSystem()) {
740 String persistenceId = "testApplyState";
742 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
744 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
746 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
748 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
749 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
751 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
753 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
754 new MockRaftActorContext.MockPayload("F"));
756 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
758 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
760 mockActorRef.tell(PoisonPill.getInstance(), getRef());
767 public void testApplySnapshot() throws Exception {
768 new JavaTestKit(getSystem()) {
770 String persistenceId = "testApplySnapshot";
772 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
774 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
776 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
778 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
779 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
781 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
783 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
785 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
786 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
787 oldReplicatedLog.append(
788 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
789 mock(Payload.class)));
791 ByteString snapshotBytes = fromObject(Arrays.asList(
792 new MockRaftActorContext.MockPayload("A"),
793 new MockRaftActorContext.MockPayload("B"),
794 new MockRaftActorContext.MockPayload("C"),
795 new MockRaftActorContext.MockPayload("D")));
797 Snapshot snapshot = mock(Snapshot.class);
799 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
801 doReturn(3L).when(snapshot).getLastAppliedIndex();
803 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
805 verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
807 assertTrue("The replicatedLog should have changed",
808 oldReplicatedLog != mockRaftActor.getReplicatedLog());
810 assertEquals("lastApplied should be same as in the snapshot",
811 (Long) 3L, mockRaftActor.getLastApplied());
813 assertEquals(0, mockRaftActor.getReplicatedLog().size());
815 mockActorRef.tell(PoisonPill.getInstance(), getRef());
822 public void testSaveSnapshotFailure() throws Exception {
823 new JavaTestKit(getSystem()) {
825 String persistenceId = "testSaveSnapshotFailure";
827 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
829 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
831 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
833 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
834 Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
836 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
838 ByteString snapshotBytes = fromObject(Arrays.asList(
839 new MockRaftActorContext.MockPayload("A"),
840 new MockRaftActorContext.MockPayload("B"),
841 new MockRaftActorContext.MockPayload("C"),
842 new MockRaftActorContext.MockPayload("D")));
844 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
846 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
848 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
850 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
852 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
855 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
856 mockRaftActor.getReplicatedLog().getSnapshotIndex());
858 mockActorRef.tell(PoisonPill.getInstance(), getRef());
864 private ByteString fromObject(Object snapshot) throws Exception {
865 ByteArrayOutputStream b = null;
866 ObjectOutputStream o = null;
868 b = new ByteArrayOutputStream();
869 o = new ObjectOutputStream(b);
870 o.writeObject(snapshot);
871 byte[] snapshotBytes = b.toByteArray();
872 return ByteString.copyFrom(snapshotBytes);