1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.RoleChanged;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
62 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
68 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
74 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.Duration;
78 import scala.concurrent.duration.FiniteDuration;
80 public class RaftActorTest extends AbstractActorTest {
82 private TestActorFactory factory;
86 factory = new TestActorFactory(getSystem());
90 public void tearDown() throws Exception {
92 MockAkkaJournal.clearJournal();
93 MockSnapshotStore.setMockSnapshot(null);
96 public static class MockRaftActor extends RaftActor {
98 private final DataPersistenceProvider dataPersistenceProvider;
99 private final RaftActor delegate;
100 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
101 private final List<Object> state;
102 private ActorRef roleChangeNotifier;
103 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
105 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
106 private static final long serialVersionUID = 1L;
107 private final Map<String, String> peerAddresses;
108 private final String id;
109 private final Optional<ConfigParams> config;
110 private final DataPersistenceProvider dataPersistenceProvider;
111 private final ActorRef roleChangeNotifier;
113 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
114 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
115 ActorRef roleChangeNotifier) {
116 this.peerAddresses = peerAddresses;
118 this.config = config;
119 this.dataPersistenceProvider = dataPersistenceProvider;
120 this.roleChangeNotifier = roleChangeNotifier;
124 public MockRaftActor create() throws Exception {
125 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
126 dataPersistenceProvider);
127 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
128 return mockRaftActor;
132 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
133 DataPersistenceProvider dataPersistenceProvider) {
134 super(id, peerAddresses, config);
135 state = new ArrayList<>();
136 this.delegate = mock(RaftActor.class);
137 if(dataPersistenceProvider == null){
138 this.dataPersistenceProvider = new PersistentDataProvider();
140 this.dataPersistenceProvider = dataPersistenceProvider;
144 public void waitForRecoveryComplete() {
146 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
147 } catch (InterruptedException e) {
152 public void waitForInitializeBehaviorComplete() {
154 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
155 } catch (InterruptedException e) {
160 public List<Object> getState() {
164 public static Props props(final String id, final Map<String, String> peerAddresses,
165 Optional<ConfigParams> config){
166 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
169 public static Props props(final String id, final Map<String, String> peerAddresses,
170 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
171 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
174 public static Props props(final String id, final Map<String, String> peerAddresses,
175 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
176 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
179 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
180 delegate.applyState(clientActor, identifier, data);
181 LOG.info("applyState called");
185 protected void startLogRecoveryBatch(int maxBatchSize) {
189 protected void appendRecoveredLogEntry(Payload data) {
194 protected void applyCurrentLogRecoveryBatch() {
198 protected void onRecoveryComplete() {
199 delegate.onRecoveryComplete();
200 recoveryComplete.countDown();
204 protected void initializeBehavior() {
205 super.initializeBehavior();
206 initializeBehaviorComplete.countDown();
210 protected void applyRecoverySnapshot(byte[] bytes) {
211 delegate.applyRecoverySnapshot(bytes);
213 Object data = toObject(bytes);
214 if (data instanceof List) {
215 state.addAll((List<?>) data);
217 } catch (Exception e) {
222 @Override protected void createSnapshot() {
223 delegate.createSnapshot();
226 @Override protected void applySnapshot(byte [] snapshot) {
227 delegate.applySnapshot(snapshot);
230 @Override protected void onStateChanged() {
231 delegate.onStateChanged();
235 protected DataPersistenceProvider persistence() {
236 return this.dataPersistenceProvider;
240 protected Optional<ActorRef> getRoleChangeNotifier() {
241 return Optional.fromNullable(roleChangeNotifier);
244 @Override public String persistenceId() {
248 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
250 ByteArrayInputStream bis = null;
251 ObjectInputStream ois = null;
253 bis = new ByteArrayInputStream(bs);
254 ois = new ObjectInputStream(bis);
255 obj = ois.readObject();
267 public ReplicatedLog getReplicatedLog(){
268 return this.getRaftActorContext().getReplicatedLog();
274 private static class RaftActorTestKit extends JavaTestKit {
275 private final ActorRef raftActor;
277 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
280 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
281 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
286 public ActorRef getRaftActor() {
290 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
291 // Wait for a specific log message to show up
293 new JavaTestKit.EventFilter<Boolean>(logEventClass
296 protected Boolean run() {
299 }.from(raftActor.path().toString())
301 .occurrences(1).exec();
306 protected void waitUntilLeader(){
307 waitUntilLeader(raftActor);
310 protected void waitUntilLeader(ActorRef actorRef) {
311 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
312 for(int i = 0; i < 20 * 5; i++) {
313 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
315 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
316 if(resp.getLeaderActor() != null) {
319 } catch(TimeoutException e) {
320 } catch(Exception e) {
321 System.err.println("FindLeader threw ex");
326 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
329 Assert.fail("Leader not found for actorRef " + actorRef.path());
336 public void testConstruction() {
337 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
341 public void testFindLeaderWhenLeaderIsSelf(){
342 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
343 kit.waitUntilLeader();
347 public void testRaftActorRecovery() throws Exception {
348 new JavaTestKit(getSystem()) {{
349 String persistenceId = factory.generateActorId("follower-");
351 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
352 // Set the heartbeat interval high to essentially disable election otherwise the test
353 // may fail if the actor is switched to Leader and the commitIndex is set to the last
355 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
357 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
358 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
360 watch(followerActor);
362 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
363 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
364 new MockRaftActorContext.MockPayload("E"));
365 snapshotUnappliedEntries.add(entry1);
367 int lastAppliedDuringSnapshotCapture = 3;
368 int lastIndexDuringSnapshotCapture = 4;
370 // 4 messages as part of snapshot, which are applied to state
371 ByteString snapshotBytes = fromObject(Arrays.asList(
372 new MockRaftActorContext.MockPayload("A"),
373 new MockRaftActorContext.MockPayload("B"),
374 new MockRaftActorContext.MockPayload("C"),
375 new MockRaftActorContext.MockPayload("D")));
377 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
378 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
379 lastAppliedDuringSnapshotCapture, 1);
380 MockSnapshotStore.setMockSnapshot(snapshot);
381 MockSnapshotStore.setPersistenceId(persistenceId);
383 // add more entries after snapshot is taken
384 List<ReplicatedLogEntry> entries = new ArrayList<>();
385 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
386 new MockRaftActorContext.MockPayload("F"));
387 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
388 new MockRaftActorContext.MockPayload("G"));
389 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
390 new MockRaftActorContext.MockPayload("H"));
395 int lastAppliedToState = 5;
398 MockAkkaJournal.addToJournal(5, entry2);
399 // 2 entries are applied to state besides the 4 entries in snapshot
400 MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
401 MockAkkaJournal.addToJournal(7, entry3);
402 MockAkkaJournal.addToJournal(8, entry4);
406 followerActor.tell(PoisonPill.getInstance(), null);
407 expectMsgClass(duration("5 seconds"), Terminated.class);
409 unwatch(followerActor);
411 //reinstate the actor
412 TestActorRef<MockRaftActor> ref = factory.createTestActor(
413 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
414 Optional.<ConfigParams>of(config)));
416 ref.underlyingActor().waitForRecoveryComplete();
418 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
419 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
420 context.getReplicatedLog().size());
421 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
422 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
423 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
424 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
429 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
430 new JavaTestKit(getSystem()) {{
431 String persistenceId = factory.generateActorId("leader-");
433 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
434 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
436 // Setup the persisted journal with some entries
437 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
438 new MockRaftActorContext.MockPayload("zero"));
439 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
440 new MockRaftActorContext.MockPayload("oen"));
441 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
442 new MockRaftActorContext.MockPayload("two"));
445 MockAkkaJournal.addToJournal(seqNr++, entry0);
446 MockAkkaJournal.addToJournal(seqNr++, entry1);
447 MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
448 MockAkkaJournal.addToJournal(seqNr++, entry2);
450 int lastAppliedToState = 1;
453 //reinstate the actor
454 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
455 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
456 Optional.<ConfigParams>of(config)));
458 leaderActor.underlyingActor().waitForRecoveryComplete();
460 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
461 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
462 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
463 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
464 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
469 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
470 * process recovery messages
476 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
477 new JavaTestKit(getSystem()) {
479 String persistenceId = factory.generateActorId("leader-");
481 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
483 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
485 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
486 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
488 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
490 // Wait for akka's recovery to complete so it doesn't interfere.
491 mockRaftActor.waitForRecoveryComplete();
493 ByteString snapshotBytes = fromObject(Arrays.asList(
494 new MockRaftActorContext.MockPayload("A"),
495 new MockRaftActorContext.MockPayload("B"),
496 new MockRaftActorContext.MockPayload("C"),
497 new MockRaftActorContext.MockPayload("D")));
499 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
500 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
502 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
504 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
506 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
508 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
510 assertEquals("add replicated log entry", 1, replicatedLog.size());
512 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
514 assertEquals("add replicated log entry", 2, replicatedLog.size());
516 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
518 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
520 // The snapshot had 4 items + we added 2 more items during the test
521 // We start removing from 5 and we should get 1 item in the replicated log
522 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
524 assertEquals("remove log entries", 1, replicatedLog.size());
526 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
528 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
529 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
531 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
537 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
538 * not process recovery messages
543 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
544 new JavaTestKit(getSystem()) {
546 String persistenceId = factory.generateActorId("leader-");
548 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
550 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
552 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
553 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
555 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
557 // Wait for akka's recovery to complete so it doesn't interfere.
558 mockRaftActor.waitForRecoveryComplete();
560 ByteString snapshotBytes = fromObject(Arrays.asList(
561 new MockRaftActorContext.MockPayload("A"),
562 new MockRaftActorContext.MockPayload("B"),
563 new MockRaftActorContext.MockPayload("C"),
564 new MockRaftActorContext.MockPayload("D")));
566 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
567 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
569 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
571 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
573 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
575 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
577 assertEquals("add replicated log entry", 0, replicatedLog.size());
579 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
581 assertEquals("add replicated log entry", 0, replicatedLog.size());
583 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
585 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
587 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
589 assertEquals("remove log entries", 0, replicatedLog.size());
591 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
593 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
594 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
596 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
602 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
603 new JavaTestKit(getSystem()) {
605 String persistenceId = factory.generateActorId("leader-");
607 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
609 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
611 CountDownLatch persistLatch = new CountDownLatch(1);
612 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
613 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
615 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
616 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
618 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
620 mockRaftActor.waitForInitializeBehaviorComplete();
622 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
624 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
630 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
631 new JavaTestKit(getSystem()) {
633 String persistenceId = factory.generateActorId("leader-");
635 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
637 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
639 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
641 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
642 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
644 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
646 mockRaftActor.waitForInitializeBehaviorComplete();
648 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
650 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
652 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
658 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
659 new JavaTestKit(getSystem()) {
661 String persistenceId = factory.generateActorId("leader-");
663 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
665 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
667 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
669 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
670 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
672 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
674 mockRaftActor.waitForInitializeBehaviorComplete();
676 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
678 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
680 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
686 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
687 new JavaTestKit(getSystem()) {
689 String persistenceId = factory.generateActorId("leader-");
691 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
693 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
695 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
697 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
698 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
700 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
702 mockRaftActor.waitForInitializeBehaviorComplete();
704 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
706 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
714 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
715 new JavaTestKit(getSystem()) {
717 String persistenceId = factory.generateActorId("leader-");
719 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
721 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
723 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
725 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
726 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
727 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
729 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
731 mockRaftActor.waitForInitializeBehaviorComplete();
733 ByteString snapshotBytes = fromObject(Arrays.asList(
734 new MockRaftActorContext.MockPayload("A"),
735 new MockRaftActorContext.MockPayload("B"),
736 new MockRaftActorContext.MockPayload("C"),
737 new MockRaftActorContext.MockPayload("D")));
739 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
741 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
743 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
745 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
747 verify(dataPersistenceProvider).saveSnapshot(anyObject());
754 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
755 new JavaTestKit(getSystem()) {
757 String persistenceId = factory.generateActorId("leader-");
759 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
761 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
763 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
765 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
766 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
768 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
770 mockRaftActor.waitForInitializeBehaviorComplete();
772 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
773 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
774 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
775 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
776 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
778 ByteString snapshotBytes = fromObject(Arrays.asList(
779 new MockRaftActorContext.MockPayload("A"),
780 new MockRaftActorContext.MockPayload("B"),
781 new MockRaftActorContext.MockPayload("C"),
782 new MockRaftActorContext.MockPayload("D")));
784 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
785 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
787 long replicatedToAllIndex = 1;
788 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
790 verify(mockRaftActor.delegate).createSnapshot();
792 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
794 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
796 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
798 verify(dataPersistenceProvider).deleteMessages(100);
800 assertEquals(3, mockRaftActor.getReplicatedLog().size());
801 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
803 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
804 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
805 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
807 // Index 2 will not be in the log because it was removed due to snapshotting
808 assertNull(mockRaftActor.getReplicatedLog().get(1));
809 assertNull(mockRaftActor.getReplicatedLog().get(0));
816 public void testApplyState() throws Exception {
818 new JavaTestKit(getSystem()) {
820 String persistenceId = factory.generateActorId("leader-");
822 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
824 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
826 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
828 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
829 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
831 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
833 mockRaftActor.waitForInitializeBehaviorComplete();
835 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
836 new MockRaftActorContext.MockPayload("F"));
838 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
840 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
847 public void testApplySnapshot() throws Exception {
848 new JavaTestKit(getSystem()) {
850 String persistenceId = factory.generateActorId("leader-");
852 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
854 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
856 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
858 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
859 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
861 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
863 mockRaftActor.waitForInitializeBehaviorComplete();
865 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
867 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
868 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
869 oldReplicatedLog.append(
870 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
871 mock(Payload.class)));
873 ByteString snapshotBytes = fromObject(Arrays.asList(
874 new MockRaftActorContext.MockPayload("A"),
875 new MockRaftActorContext.MockPayload("B"),
876 new MockRaftActorContext.MockPayload("C"),
877 new MockRaftActorContext.MockPayload("D")));
879 Snapshot snapshot = mock(Snapshot.class);
881 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
883 doReturn(3L).when(snapshot).getLastAppliedIndex();
885 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
887 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
889 assertTrue("The replicatedLog should have changed",
890 oldReplicatedLog != mockRaftActor.getReplicatedLog());
892 assertEquals("lastApplied should be same as in the snapshot",
893 (Long) 3L, mockRaftActor.getLastApplied());
895 assertEquals(0, mockRaftActor.getReplicatedLog().size());
902 public void testSaveSnapshotFailure() throws Exception {
903 new JavaTestKit(getSystem()) {
905 String persistenceId = factory.generateActorId("leader-");
907 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
909 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
911 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
913 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
914 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
916 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
918 mockRaftActor.waitForInitializeBehaviorComplete();
920 ByteString snapshotBytes = fromObject(Arrays.asList(
921 new MockRaftActorContext.MockPayload("A"),
922 new MockRaftActorContext.MockPayload("B"),
923 new MockRaftActorContext.MockPayload("C"),
924 new MockRaftActorContext.MockPayload("D")));
926 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
928 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
930 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
932 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
934 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
937 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
938 mockRaftActor.getReplicatedLog().getSnapshotIndex());
945 public void testRaftRoleChangeNotifier() throws Exception {
946 new JavaTestKit(getSystem()) {{
947 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
948 MessageCollectorActor.waitUntilReady(notifierActor);
950 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
951 long heartBeatInterval = 100;
952 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
953 config.setElectionTimeoutFactor(1);
955 String persistenceId = factory.generateActorId("notifier-");
957 factory.createTestActor(MockRaftActor.props(persistenceId,
958 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
960 List<RoleChanged> matches = null;
961 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
962 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
963 assertNotNull(matches);
964 if(matches.size() == 3) {
967 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
970 assertEquals(3, matches.size());
972 // check if the notifier got a role change from null to Follower
973 RoleChanged raftRoleChanged = matches.get(0);
974 assertEquals(persistenceId, raftRoleChanged.getMemberId());
975 assertNull(raftRoleChanged.getOldRole());
976 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
978 // check if the notifier got a role change from Follower to Candidate
979 raftRoleChanged = matches.get(1);
980 assertEquals(persistenceId, raftRoleChanged.getMemberId());
981 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
982 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
984 // check if the notifier got a role change from Candidate to Leader
985 raftRoleChanged = matches.get(2);
986 assertEquals(persistenceId, raftRoleChanged.getMemberId());
987 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
988 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
993 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
994 new JavaTestKit(getSystem()) {
996 String persistenceId = factory.generateActorId("leader-");
997 String follower1Id = factory.generateActorId("follower-");
999 ActorRef followerActor1 =
1000 factory.createActor(Props.create(MessageCollectorActor.class));
1002 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1003 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1004 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1006 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1008 Map<String, String> peerAddresses = new HashMap<>();
1009 peerAddresses.put(follower1Id, followerActor1.path().toString());
1011 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1012 MockRaftActor.props(persistenceId, peerAddresses,
1013 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1015 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1017 leaderActor.getRaftActorContext().setCommitIndex(4);
1018 leaderActor.getRaftActorContext().setLastApplied(4);
1019 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1021 leaderActor.waitForInitializeBehaviorComplete();
1023 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1025 Leader leader = new Leader(leaderActor.getRaftActorContext());
1026 leaderActor.setCurrentBehavior(leader);
1027 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1029 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1030 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1032 assertEquals(8, leaderActor.getReplicatedLog().size());
1034 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1036 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1037 verify(leaderActor.delegate).createSnapshot();
1039 assertEquals(8, leaderActor.getReplicatedLog().size());
1041 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1042 //fake snapshot on index 5
1043 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1045 assertEquals(8, leaderActor.getReplicatedLog().size());
1047 //fake snapshot on index 6
1048 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1049 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1050 assertEquals(8, leaderActor.getReplicatedLog().size());
1052 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1054 assertEquals(8, leaderActor.getReplicatedLog().size());
1056 ByteString snapshotBytes = fromObject(Arrays.asList(
1057 new MockRaftActorContext.MockPayload("foo-0"),
1058 new MockRaftActorContext.MockPayload("foo-1"),
1059 new MockRaftActorContext.MockPayload("foo-2"),
1060 new MockRaftActorContext.MockPayload("foo-3"),
1061 new MockRaftActorContext.MockPayload("foo-4")));
1062 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1063 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1065 // capture snapshot reply should remove the snapshotted entries only
1066 assertEquals(3, leaderActor.getReplicatedLog().size());
1067 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1069 // add another non-replicated entry
1070 leaderActor.getReplicatedLog().append(
1071 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1073 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1074 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1075 assertEquals(2, leaderActor.getReplicatedLog().size());
1076 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1083 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1084 new JavaTestKit(getSystem()) {
1086 String persistenceId = factory.generateActorId("follower-");
1087 String leaderId = factory.generateActorId("leader-");
1090 ActorRef leaderActor1 =
1091 factory.createActor(Props.create(MessageCollectorActor.class));
1093 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1094 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1095 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1097 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1099 Map<String, String> peerAddresses = new HashMap<>();
1100 peerAddresses.put(leaderId, leaderActor1.path().toString());
1102 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1103 MockRaftActor.props(persistenceId, peerAddresses,
1104 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1106 MockRaftActor followerActor = mockActorRef.underlyingActor();
1107 followerActor.getRaftActorContext().setCommitIndex(4);
1108 followerActor.getRaftActorContext().setLastApplied(4);
1109 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1111 followerActor.waitForInitializeBehaviorComplete();
1114 Follower follower = new Follower(followerActor.getRaftActorContext());
1115 followerActor.setCurrentBehavior(follower);
1116 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1118 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1119 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1120 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1122 // log has indices 0-5
1123 assertEquals(6, followerActor.getReplicatedLog().size());
1126 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1128 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1129 verify(followerActor.delegate).createSnapshot();
1131 assertEquals(6, followerActor.getReplicatedLog().size());
1133 //fake snapshot on index 6
1134 List<ReplicatedLogEntry> entries =
1136 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1137 new MockRaftActorContext.MockPayload("foo-6"))
1139 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1140 assertEquals(7, followerActor.getReplicatedLog().size());
1142 //fake snapshot on index 7
1143 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1147 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1148 new MockRaftActorContext.MockPayload("foo-7"))
1150 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1151 assertEquals(8, followerActor.getReplicatedLog().size());
1153 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1156 ByteString snapshotBytes = fromObject(Arrays.asList(
1157 new MockRaftActorContext.MockPayload("foo-0"),
1158 new MockRaftActorContext.MockPayload("foo-1"),
1159 new MockRaftActorContext.MockPayload("foo-2"),
1160 new MockRaftActorContext.MockPayload("foo-3"),
1161 new MockRaftActorContext.MockPayload("foo-4")));
1162 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1163 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1165 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1166 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1167 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1171 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1172 new MockRaftActorContext.MockPayload("foo-7"))
1174 // send an additional entry 8 with leaderCommit = 7
1175 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1177 // 7 and 8, as lastapplied is 7
1178 assertEquals(2, followerActor.getReplicatedLog().size());
1185 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1186 new JavaTestKit(getSystem()) {
1188 String persistenceId = factory.generateActorId("leader-");
1189 String follower1Id = factory.generateActorId("follower-");
1190 String follower2Id = factory.generateActorId("follower-");
1192 ActorRef followerActor1 =
1193 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1194 ActorRef followerActor2 =
1195 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1197 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1198 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1199 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1201 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1203 Map<String, String> peerAddresses = new HashMap<>();
1204 peerAddresses.put(follower1Id, followerActor1.path().toString());
1205 peerAddresses.put(follower2Id, followerActor2.path().toString());
1207 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1208 MockRaftActor.props(persistenceId, peerAddresses,
1209 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1211 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1212 leaderActor.getRaftActorContext().setCommitIndex(9);
1213 leaderActor.getRaftActorContext().setLastApplied(9);
1214 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1216 leaderActor.waitForInitializeBehaviorComplete();
1218 Leader leader = new Leader(leaderActor.getRaftActorContext());
1219 leaderActor.setCurrentBehavior(leader);
1220 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1222 // create 5 entries in the log
1223 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1224 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1226 //set the snapshot index to 4 , 0 to 4 are snapshotted
1227 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1228 //setting replicatedToAllIndex = 9, for the log to clear
1229 leader.setReplicatedToAllIndex(9);
1230 assertEquals(5, leaderActor.getReplicatedLog().size());
1231 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1233 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1234 assertEquals(5, leaderActor.getReplicatedLog().size());
1235 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1237 // set the 2nd follower nextIndex to 1 which has been snapshotted
1238 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1239 assertEquals(5, leaderActor.getReplicatedLog().size());
1240 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1242 // simulate a real snapshot
1243 leaderActor.onReceiveCommand(new SendHeartBeat());
1244 assertEquals(5, leaderActor.getReplicatedLog().size());
1245 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1246 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1247 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1250 //reply from a slow follower does not initiate a fake snapshot
1251 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1252 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1254 ByteString snapshotBytes = fromObject(Arrays.asList(
1255 new MockRaftActorContext.MockPayload("foo-0"),
1256 new MockRaftActorContext.MockPayload("foo-1"),
1257 new MockRaftActorContext.MockPayload("foo-2"),
1258 new MockRaftActorContext.MockPayload("foo-3"),
1259 new MockRaftActorContext.MockPayload("foo-4")));
1260 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1261 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1263 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1265 //reply from a slow follower after should not raise errors
1266 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1267 assertEquals(0, leaderActor.getReplicatedLog().size());
1273 private static class NonPersistentProvider implements DataPersistenceProvider {
1275 public boolean isRecoveryApplicable() {
1280 public <T> void persist(T o, Procedure<T> procedure) {
1283 } catch (Exception e) {
1284 e.printStackTrace();
1289 public void saveSnapshot(Object o) {
1294 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1299 public void deleteMessages(long sequenceNumber) {
1305 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1306 new JavaTestKit(getSystem()) {{
1307 String persistenceId = factory.generateActorId("leader-");
1308 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1309 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1310 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1311 config.setSnapshotBatchCount(5);
1313 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1315 Map<String, String> peerAddresses = new HashMap<>();
1317 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1318 MockRaftActor.props(persistenceId, peerAddresses,
1319 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1321 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1322 leaderActor.getRaftActorContext().setCommitIndex(3);
1323 leaderActor.getRaftActorContext().setLastApplied(3);
1324 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1326 leaderActor.waitForInitializeBehaviorComplete();
1327 for(int i=0;i< 4;i++) {
1328 leaderActor.getReplicatedLog()
1329 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1330 new MockRaftActorContext.MockPayload("A")));
1333 Leader leader = new Leader(leaderActor.getRaftActorContext());
1334 leaderActor.setCurrentBehavior(leader);
1335 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1337 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1338 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1340 // Now send a CaptureSnapshotReply
1341 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1343 // Trimming log in this scenario is a no-op
1344 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1345 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1346 assertEquals(-1, leader.getReplicatedToAllIndex());
1352 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1353 new JavaTestKit(getSystem()) {{
1354 String persistenceId = factory.generateActorId("leader-");
1355 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1356 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1357 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1358 config.setSnapshotBatchCount(5);
1360 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1362 Map<String, String> peerAddresses = new HashMap<>();
1364 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1365 MockRaftActor.props(persistenceId, peerAddresses,
1366 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1368 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1369 leaderActor.getRaftActorContext().setCommitIndex(3);
1370 leaderActor.getRaftActorContext().setLastApplied(3);
1371 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1372 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1374 leaderActor.waitForInitializeBehaviorComplete();
1375 Leader leader = new Leader(leaderActor.getRaftActorContext());
1376 leaderActor.setCurrentBehavior(leader);
1377 leader.setReplicatedToAllIndex(3);
1378 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1380 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1381 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1383 // Now send a CaptureSnapshotReply
1384 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1386 // Trimming log in this scenario is a no-op
1387 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1388 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1389 assertEquals(3, leader.getReplicatedToAllIndex());
1394 private ByteString fromObject(Object snapshot) throws Exception {
1395 ByteArrayOutputStream b = null;
1396 ObjectOutputStream o = null;
1398 b = new ByteArrayOutputStream();
1399 o = new ObjectOutputStream(b);
1400 o.writeObject(snapshot);
1401 byte[] snapshotBytes = b.toByteArray();
1402 return ByteString.copyFrom(snapshotBytes);