1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.RoleChanged;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
62 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
64 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
68 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
70 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
71 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
72 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
73 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
74 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.Duration;
78 import scala.concurrent.duration.FiniteDuration;
80 public class RaftActorTest extends AbstractActorTest {
82 private TestActorFactory factory;
86 factory = new TestActorFactory(getSystem());
90 public void tearDown() throws Exception {
92 InMemoryJournal.clear();
93 InMemorySnapshotStore.clear();
96 public static class MockRaftActor extends RaftActor {
98 private final DataPersistenceProvider dataPersistenceProvider;
99 private final RaftActor delegate;
100 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
101 private final List<Object> state;
102 private ActorRef roleChangeNotifier;
103 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
105 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
106 private static final long serialVersionUID = 1L;
107 private final Map<String, String> peerAddresses;
108 private final String id;
109 private final Optional<ConfigParams> config;
110 private final DataPersistenceProvider dataPersistenceProvider;
111 private final ActorRef roleChangeNotifier;
113 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
114 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
115 ActorRef roleChangeNotifier) {
116 this.peerAddresses = peerAddresses;
118 this.config = config;
119 this.dataPersistenceProvider = dataPersistenceProvider;
120 this.roleChangeNotifier = roleChangeNotifier;
124 public MockRaftActor create() throws Exception {
125 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
126 dataPersistenceProvider);
127 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
128 return mockRaftActor;
132 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
133 DataPersistenceProvider dataPersistenceProvider) {
134 super(id, peerAddresses, config);
135 state = new ArrayList<>();
136 this.delegate = mock(RaftActor.class);
137 if(dataPersistenceProvider == null){
138 this.dataPersistenceProvider = new PersistentDataProvider();
140 this.dataPersistenceProvider = dataPersistenceProvider;
144 public void waitForRecoveryComplete() {
146 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
147 } catch (InterruptedException e) {
152 public void waitForInitializeBehaviorComplete() {
154 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
155 } catch (InterruptedException e) {
160 public List<Object> getState() {
164 public static Props props(final String id, final Map<String, String> peerAddresses,
165 Optional<ConfigParams> config){
166 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
169 public static Props props(final String id, final Map<String, String> peerAddresses,
170 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
171 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
174 public static Props props(final String id, final Map<String, String> peerAddresses,
175 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
176 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
179 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
180 delegate.applyState(clientActor, identifier, data);
181 LOG.info("applyState called");
185 protected void startLogRecoveryBatch(int maxBatchSize) {
189 protected void appendRecoveredLogEntry(Payload data) {
194 protected void applyCurrentLogRecoveryBatch() {
198 protected void onRecoveryComplete() {
199 delegate.onRecoveryComplete();
200 recoveryComplete.countDown();
204 protected void initializeBehavior() {
205 super.initializeBehavior();
206 initializeBehaviorComplete.countDown();
210 protected void applyRecoverySnapshot(byte[] bytes) {
211 delegate.applyRecoverySnapshot(bytes);
213 Object data = toObject(bytes);
214 if (data instanceof List) {
215 state.addAll((List<?>) data);
217 } catch (Exception e) {
222 @Override protected void createSnapshot() {
223 delegate.createSnapshot();
226 @Override protected void applySnapshot(byte [] snapshot) {
227 delegate.applySnapshot(snapshot);
230 @Override protected void onStateChanged() {
231 delegate.onStateChanged();
235 protected DataPersistenceProvider persistence() {
236 return this.dataPersistenceProvider;
240 protected Optional<ActorRef> getRoleChangeNotifier() {
241 return Optional.fromNullable(roleChangeNotifier);
244 @Override public String persistenceId() {
248 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
250 ByteArrayInputStream bis = null;
251 ObjectInputStream ois = null;
253 bis = new ByteArrayInputStream(bs);
254 ois = new ObjectInputStream(bis);
255 obj = ois.readObject();
267 public ReplicatedLog getReplicatedLog(){
268 return this.getRaftActorContext().getReplicatedLog();
274 private static class RaftActorTestKit extends JavaTestKit {
275 private final ActorRef raftActor;
277 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
280 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
281 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
286 public ActorRef getRaftActor() {
290 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
291 // Wait for a specific log message to show up
293 new JavaTestKit.EventFilter<Boolean>(logEventClass
296 protected Boolean run() {
299 }.from(raftActor.path().toString())
301 .occurrences(1).exec();
306 protected void waitUntilLeader(){
307 waitUntilLeader(raftActor);
310 protected void waitUntilLeader(ActorRef actorRef) {
311 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
312 for(int i = 0; i < 20 * 5; i++) {
313 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
315 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
316 if(resp.getLeaderActor() != null) {
319 } catch(TimeoutException e) {
320 } catch(Exception e) {
321 System.err.println("FindLeader threw ex");
326 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
329 Assert.fail("Leader not found for actorRef " + actorRef.path());
336 public void testConstruction() {
337 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
341 public void testFindLeaderWhenLeaderIsSelf(){
342 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
343 kit.waitUntilLeader();
347 public void testRaftActorRecovery() throws Exception {
348 new JavaTestKit(getSystem()) {{
349 String persistenceId = factory.generateActorId("follower-");
351 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
352 // Set the heartbeat interval high to essentially disable election otherwise the test
353 // may fail if the actor is switched to Leader and the commitIndex is set to the last
355 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
357 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
358 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
360 watch(followerActor);
362 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
363 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
364 new MockRaftActorContext.MockPayload("E"));
365 snapshotUnappliedEntries.add(entry1);
367 int lastAppliedDuringSnapshotCapture = 3;
368 int lastIndexDuringSnapshotCapture = 4;
370 // 4 messages as part of snapshot, which are applied to state
371 ByteString snapshotBytes = fromObject(Arrays.asList(
372 new MockRaftActorContext.MockPayload("A"),
373 new MockRaftActorContext.MockPayload("B"),
374 new MockRaftActorContext.MockPayload("C"),
375 new MockRaftActorContext.MockPayload("D")));
377 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
378 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
379 lastAppliedDuringSnapshotCapture, 1);
380 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
382 // add more entries after snapshot is taken
383 List<ReplicatedLogEntry> entries = new ArrayList<>();
384 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
385 new MockRaftActorContext.MockPayload("F"));
386 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
387 new MockRaftActorContext.MockPayload("G"));
388 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
389 new MockRaftActorContext.MockPayload("H"));
394 int lastAppliedToState = 5;
397 InMemoryJournal.addEntry(persistenceId, 5, entry2);
398 // 2 entries are applied to state besides the 4 entries in snapshot
399 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
400 InMemoryJournal.addEntry(persistenceId, 7, entry3);
401 InMemoryJournal.addEntry(persistenceId, 8, entry4);
404 followerActor.tell(PoisonPill.getInstance(), null);
405 expectMsgClass(duration("5 seconds"), Terminated.class);
407 unwatch(followerActor);
409 //reinstate the actor
410 TestActorRef<MockRaftActor> ref = factory.createTestActor(
411 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
412 Optional.<ConfigParams>of(config)));
414 ref.underlyingActor().waitForRecoveryComplete();
416 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
417 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
418 context.getReplicatedLog().size());
419 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
420 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
421 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
422 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
427 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
428 new JavaTestKit(getSystem()) {{
429 String persistenceId = factory.generateActorId("leader-");
431 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
432 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
434 // Setup the persisted journal with some entries
435 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
436 new MockRaftActorContext.MockPayload("zero"));
437 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
438 new MockRaftActorContext.MockPayload("oen"));
439 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
440 new MockRaftActorContext.MockPayload("two"));
443 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
444 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
445 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
446 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
448 int lastAppliedToState = 1;
451 //reinstate the actor
452 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
453 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
454 Optional.<ConfigParams>of(config)));
456 leaderActor.underlyingActor().waitForRecoveryComplete();
458 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
459 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
460 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
461 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
462 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
467 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
468 * process recovery messages
474 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
475 new JavaTestKit(getSystem()) {
477 String persistenceId = factory.generateActorId("leader-");
479 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
481 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
483 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
484 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
486 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
488 // Wait for akka's recovery to complete so it doesn't interfere.
489 mockRaftActor.waitForRecoveryComplete();
491 ByteString snapshotBytes = fromObject(Arrays.asList(
492 new MockRaftActorContext.MockPayload("A"),
493 new MockRaftActorContext.MockPayload("B"),
494 new MockRaftActorContext.MockPayload("C"),
495 new MockRaftActorContext.MockPayload("D")));
497 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
498 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
500 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
502 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
504 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
506 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
508 assertEquals("add replicated log entry", 1, replicatedLog.size());
510 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
512 assertEquals("add replicated log entry", 2, replicatedLog.size());
514 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
516 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
518 // The snapshot had 4 items + we added 2 more items during the test
519 // We start removing from 5 and we should get 1 item in the replicated log
520 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
522 assertEquals("remove log entries", 1, replicatedLog.size());
524 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
526 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
527 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
529 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
535 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
536 * not process recovery messages
541 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
542 new JavaTestKit(getSystem()) {
544 String persistenceId = factory.generateActorId("leader-");
546 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
548 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
550 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
551 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
553 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
555 // Wait for akka's recovery to complete so it doesn't interfere.
556 mockRaftActor.waitForRecoveryComplete();
558 ByteString snapshotBytes = fromObject(Arrays.asList(
559 new MockRaftActorContext.MockPayload("A"),
560 new MockRaftActorContext.MockPayload("B"),
561 new MockRaftActorContext.MockPayload("C"),
562 new MockRaftActorContext.MockPayload("D")));
564 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
565 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
567 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
569 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
571 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
573 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
575 assertEquals("add replicated log entry", 0, replicatedLog.size());
577 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
579 assertEquals("add replicated log entry", 0, replicatedLog.size());
581 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
583 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
585 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
587 assertEquals("remove log entries", 0, replicatedLog.size());
589 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
591 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
592 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
594 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
600 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
601 new JavaTestKit(getSystem()) {
603 String persistenceId = factory.generateActorId("leader-");
605 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
607 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
609 CountDownLatch persistLatch = new CountDownLatch(1);
610 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
611 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
613 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
614 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
616 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
618 mockRaftActor.waitForInitializeBehaviorComplete();
620 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
622 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
628 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
629 new JavaTestKit(getSystem()) {
631 String persistenceId = factory.generateActorId("leader-");
633 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
635 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
637 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
639 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
640 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
642 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
644 mockRaftActor.waitForInitializeBehaviorComplete();
646 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
648 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
650 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
656 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
657 new JavaTestKit(getSystem()) {
659 String persistenceId = factory.generateActorId("leader-");
661 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
663 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
665 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
667 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
668 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
670 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
672 mockRaftActor.waitForInitializeBehaviorComplete();
674 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
676 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
678 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
684 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
685 new JavaTestKit(getSystem()) {
687 String persistenceId = factory.generateActorId("leader-");
689 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
691 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
693 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
695 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
696 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
698 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
700 mockRaftActor.waitForInitializeBehaviorComplete();
702 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
704 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
712 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
713 new JavaTestKit(getSystem()) {
715 String persistenceId = factory.generateActorId("leader-");
717 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
719 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
721 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
723 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
724 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
725 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
727 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
729 mockRaftActor.waitForInitializeBehaviorComplete();
731 ByteString snapshotBytes = fromObject(Arrays.asList(
732 new MockRaftActorContext.MockPayload("A"),
733 new MockRaftActorContext.MockPayload("B"),
734 new MockRaftActorContext.MockPayload("C"),
735 new MockRaftActorContext.MockPayload("D")));
737 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
739 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
741 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
743 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
745 verify(dataPersistenceProvider).saveSnapshot(anyObject());
752 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
753 new JavaTestKit(getSystem()) {
755 String persistenceId = factory.generateActorId("leader-");
757 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
759 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
763 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
764 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
766 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
768 mockRaftActor.waitForInitializeBehaviorComplete();
770 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
771 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
772 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
773 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
774 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
776 ByteString snapshotBytes = fromObject(Arrays.asList(
777 new MockRaftActorContext.MockPayload("A"),
778 new MockRaftActorContext.MockPayload("B"),
779 new MockRaftActorContext.MockPayload("C"),
780 new MockRaftActorContext.MockPayload("D")));
782 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
783 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
785 long replicatedToAllIndex = 1;
786 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
788 verify(mockRaftActor.delegate).createSnapshot();
790 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
792 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
794 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
796 verify(dataPersistenceProvider).deleteMessages(100);
798 assertEquals(3, mockRaftActor.getReplicatedLog().size());
799 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
801 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
802 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
803 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
805 // Index 2 will not be in the log because it was removed due to snapshotting
806 assertNull(mockRaftActor.getReplicatedLog().get(1));
807 assertNull(mockRaftActor.getReplicatedLog().get(0));
814 public void testApplyState() throws Exception {
816 new JavaTestKit(getSystem()) {
818 String persistenceId = factory.generateActorId("leader-");
820 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
822 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
824 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
826 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
827 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
829 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
831 mockRaftActor.waitForInitializeBehaviorComplete();
833 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
834 new MockRaftActorContext.MockPayload("F"));
836 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
838 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
845 public void testApplySnapshot() throws Exception {
846 new JavaTestKit(getSystem()) {
848 String persistenceId = factory.generateActorId("leader-");
850 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
852 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
854 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
856 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
857 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
859 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
861 mockRaftActor.waitForInitializeBehaviorComplete();
863 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
865 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
866 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
867 oldReplicatedLog.append(
868 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
869 mock(Payload.class)));
871 ByteString snapshotBytes = fromObject(Arrays.asList(
872 new MockRaftActorContext.MockPayload("A"),
873 new MockRaftActorContext.MockPayload("B"),
874 new MockRaftActorContext.MockPayload("C"),
875 new MockRaftActorContext.MockPayload("D")));
877 Snapshot snapshot = mock(Snapshot.class);
879 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
881 doReturn(3L).when(snapshot).getLastAppliedIndex();
883 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
885 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
887 assertTrue("The replicatedLog should have changed",
888 oldReplicatedLog != mockRaftActor.getReplicatedLog());
890 assertEquals("lastApplied should be same as in the snapshot",
891 (Long) 3L, mockRaftActor.getLastApplied());
893 assertEquals(0, mockRaftActor.getReplicatedLog().size());
900 public void testSaveSnapshotFailure() throws Exception {
901 new JavaTestKit(getSystem()) {
903 String persistenceId = factory.generateActorId("leader-");
905 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
907 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
909 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
911 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
912 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
914 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
916 mockRaftActor.waitForInitializeBehaviorComplete();
918 ByteString snapshotBytes = fromObject(Arrays.asList(
919 new MockRaftActorContext.MockPayload("A"),
920 new MockRaftActorContext.MockPayload("B"),
921 new MockRaftActorContext.MockPayload("C"),
922 new MockRaftActorContext.MockPayload("D")));
924 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
926 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
928 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
930 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
932 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
935 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
936 mockRaftActor.getReplicatedLog().getSnapshotIndex());
943 public void testRaftRoleChangeNotifier() throws Exception {
944 new JavaTestKit(getSystem()) {{
945 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
946 MessageCollectorActor.waitUntilReady(notifierActor);
948 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
949 long heartBeatInterval = 100;
950 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
951 config.setElectionTimeoutFactor(1);
953 String persistenceId = factory.generateActorId("notifier-");
955 factory.createTestActor(MockRaftActor.props(persistenceId,
956 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
958 List<RoleChanged> matches = null;
959 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
960 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
961 assertNotNull(matches);
962 if(matches.size() == 3) {
965 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
968 assertEquals(3, matches.size());
970 // check if the notifier got a role change from null to Follower
971 RoleChanged raftRoleChanged = matches.get(0);
972 assertEquals(persistenceId, raftRoleChanged.getMemberId());
973 assertNull(raftRoleChanged.getOldRole());
974 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
976 // check if the notifier got a role change from Follower to Candidate
977 raftRoleChanged = matches.get(1);
978 assertEquals(persistenceId, raftRoleChanged.getMemberId());
979 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
980 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
982 // check if the notifier got a role change from Candidate to Leader
983 raftRoleChanged = matches.get(2);
984 assertEquals(persistenceId, raftRoleChanged.getMemberId());
985 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
986 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
991 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
992 new JavaTestKit(getSystem()) {
994 String persistenceId = factory.generateActorId("leader-");
995 String follower1Id = factory.generateActorId("follower-");
997 ActorRef followerActor1 =
998 factory.createActor(Props.create(MessageCollectorActor.class));
1000 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1001 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1002 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1004 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1006 Map<String, String> peerAddresses = new HashMap<>();
1007 peerAddresses.put(follower1Id, followerActor1.path().toString());
1009 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1010 MockRaftActor.props(persistenceId, peerAddresses,
1011 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1013 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1015 leaderActor.getRaftActorContext().setCommitIndex(4);
1016 leaderActor.getRaftActorContext().setLastApplied(4);
1017 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1019 leaderActor.waitForInitializeBehaviorComplete();
1021 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1023 Leader leader = new Leader(leaderActor.getRaftActorContext());
1024 leaderActor.setCurrentBehavior(leader);
1025 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1027 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1028 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1030 assertEquals(8, leaderActor.getReplicatedLog().size());
1032 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1034 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1035 verify(leaderActor.delegate).createSnapshot();
1037 assertEquals(8, leaderActor.getReplicatedLog().size());
1039 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1040 //fake snapshot on index 5
1041 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1043 assertEquals(8, leaderActor.getReplicatedLog().size());
1045 //fake snapshot on index 6
1046 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1047 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1048 assertEquals(8, leaderActor.getReplicatedLog().size());
1050 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1052 assertEquals(8, leaderActor.getReplicatedLog().size());
1054 ByteString snapshotBytes = fromObject(Arrays.asList(
1055 new MockRaftActorContext.MockPayload("foo-0"),
1056 new MockRaftActorContext.MockPayload("foo-1"),
1057 new MockRaftActorContext.MockPayload("foo-2"),
1058 new MockRaftActorContext.MockPayload("foo-3"),
1059 new MockRaftActorContext.MockPayload("foo-4")));
1060 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1061 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1063 // capture snapshot reply should remove the snapshotted entries only
1064 assertEquals(3, leaderActor.getReplicatedLog().size());
1065 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1067 // add another non-replicated entry
1068 leaderActor.getReplicatedLog().append(
1069 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1071 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1072 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1073 assertEquals(2, leaderActor.getReplicatedLog().size());
1074 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1081 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1082 new JavaTestKit(getSystem()) {
1084 String persistenceId = factory.generateActorId("follower-");
1085 String leaderId = factory.generateActorId("leader-");
1088 ActorRef leaderActor1 =
1089 factory.createActor(Props.create(MessageCollectorActor.class));
1091 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1092 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1093 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1095 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1097 Map<String, String> peerAddresses = new HashMap<>();
1098 peerAddresses.put(leaderId, leaderActor1.path().toString());
1100 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1101 MockRaftActor.props(persistenceId, peerAddresses,
1102 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1104 MockRaftActor followerActor = mockActorRef.underlyingActor();
1105 followerActor.getRaftActorContext().setCommitIndex(4);
1106 followerActor.getRaftActorContext().setLastApplied(4);
1107 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1109 followerActor.waitForInitializeBehaviorComplete();
1112 Follower follower = new Follower(followerActor.getRaftActorContext());
1113 followerActor.setCurrentBehavior(follower);
1114 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1116 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1117 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1118 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1120 // log has indices 0-5
1121 assertEquals(6, followerActor.getReplicatedLog().size());
1124 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1126 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1127 verify(followerActor.delegate).createSnapshot();
1129 assertEquals(6, followerActor.getReplicatedLog().size());
1131 //fake snapshot on index 6
1132 List<ReplicatedLogEntry> entries =
1134 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1135 new MockRaftActorContext.MockPayload("foo-6"))
1137 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1138 assertEquals(7, followerActor.getReplicatedLog().size());
1140 //fake snapshot on index 7
1141 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1145 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1146 new MockRaftActorContext.MockPayload("foo-7"))
1148 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1149 assertEquals(8, followerActor.getReplicatedLog().size());
1151 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1154 ByteString snapshotBytes = fromObject(Arrays.asList(
1155 new MockRaftActorContext.MockPayload("foo-0"),
1156 new MockRaftActorContext.MockPayload("foo-1"),
1157 new MockRaftActorContext.MockPayload("foo-2"),
1158 new MockRaftActorContext.MockPayload("foo-3"),
1159 new MockRaftActorContext.MockPayload("foo-4")));
1160 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1161 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1163 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1164 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1165 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1169 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1170 new MockRaftActorContext.MockPayload("foo-7"))
1172 // send an additional entry 8 with leaderCommit = 7
1173 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1175 // 7 and 8, as lastapplied is 7
1176 assertEquals(2, followerActor.getReplicatedLog().size());
1183 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1184 new JavaTestKit(getSystem()) {
1186 String persistenceId = factory.generateActorId("leader-");
1187 String follower1Id = factory.generateActorId("follower-");
1188 String follower2Id = factory.generateActorId("follower-");
1190 ActorRef followerActor1 =
1191 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1192 ActorRef followerActor2 =
1193 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1195 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1196 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1197 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1199 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1201 Map<String, String> peerAddresses = new HashMap<>();
1202 peerAddresses.put(follower1Id, followerActor1.path().toString());
1203 peerAddresses.put(follower2Id, followerActor2.path().toString());
1205 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1206 MockRaftActor.props(persistenceId, peerAddresses,
1207 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1209 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1210 leaderActor.getRaftActorContext().setCommitIndex(9);
1211 leaderActor.getRaftActorContext().setLastApplied(9);
1212 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1214 leaderActor.waitForInitializeBehaviorComplete();
1216 Leader leader = new Leader(leaderActor.getRaftActorContext());
1217 leaderActor.setCurrentBehavior(leader);
1218 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1220 // create 5 entries in the log
1221 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1222 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1224 //set the snapshot index to 4 , 0 to 4 are snapshotted
1225 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1226 //setting replicatedToAllIndex = 9, for the log to clear
1227 leader.setReplicatedToAllIndex(9);
1228 assertEquals(5, leaderActor.getReplicatedLog().size());
1229 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1231 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1232 assertEquals(5, leaderActor.getReplicatedLog().size());
1233 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1235 // set the 2nd follower nextIndex to 1 which has been snapshotted
1236 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1237 assertEquals(5, leaderActor.getReplicatedLog().size());
1238 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1240 // simulate a real snapshot
1241 leaderActor.onReceiveCommand(new SendHeartBeat());
1242 assertEquals(5, leaderActor.getReplicatedLog().size());
1243 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1244 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1245 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1248 //reply from a slow follower does not initiate a fake snapshot
1249 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1250 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1252 ByteString snapshotBytes = fromObject(Arrays.asList(
1253 new MockRaftActorContext.MockPayload("foo-0"),
1254 new MockRaftActorContext.MockPayload("foo-1"),
1255 new MockRaftActorContext.MockPayload("foo-2"),
1256 new MockRaftActorContext.MockPayload("foo-3"),
1257 new MockRaftActorContext.MockPayload("foo-4")));
1258 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1259 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1261 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1263 //reply from a slow follower after should not raise errors
1264 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1265 assertEquals(0, leaderActor.getReplicatedLog().size());
1271 private static class NonPersistentProvider implements DataPersistenceProvider {
1273 public boolean isRecoveryApplicable() {
1278 public <T> void persist(T o, Procedure<T> procedure) {
1281 } catch (Exception e) {
1282 e.printStackTrace();
1287 public void saveSnapshot(Object o) {
1292 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1297 public void deleteMessages(long sequenceNumber) {
1303 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1304 new JavaTestKit(getSystem()) {{
1305 String persistenceId = factory.generateActorId("leader-");
1306 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1307 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1308 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1309 config.setSnapshotBatchCount(5);
1311 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1313 Map<String, String> peerAddresses = new HashMap<>();
1315 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1316 MockRaftActor.props(persistenceId, peerAddresses,
1317 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1319 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1320 leaderActor.getRaftActorContext().setCommitIndex(3);
1321 leaderActor.getRaftActorContext().setLastApplied(3);
1322 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1324 leaderActor.waitForInitializeBehaviorComplete();
1325 for(int i=0;i< 4;i++) {
1326 leaderActor.getReplicatedLog()
1327 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1328 new MockRaftActorContext.MockPayload("A")));
1331 Leader leader = new Leader(leaderActor.getRaftActorContext());
1332 leaderActor.setCurrentBehavior(leader);
1333 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1335 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1336 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1338 // Now send a CaptureSnapshotReply
1339 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1341 // Trimming log in this scenario is a no-op
1342 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1343 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1344 assertEquals(-1, leader.getReplicatedToAllIndex());
1350 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1351 new JavaTestKit(getSystem()) {{
1352 String persistenceId = factory.generateActorId("leader-");
1353 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1354 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1355 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1356 config.setSnapshotBatchCount(5);
1358 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1360 Map<String, String> peerAddresses = new HashMap<>();
1362 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1363 MockRaftActor.props(persistenceId, peerAddresses,
1364 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1366 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1367 leaderActor.getRaftActorContext().setCommitIndex(3);
1368 leaderActor.getRaftActorContext().setLastApplied(3);
1369 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1370 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1372 leaderActor.waitForInitializeBehaviorComplete();
1373 Leader leader = new Leader(leaderActor.getRaftActorContext());
1374 leaderActor.setCurrentBehavior(leader);
1375 leader.setReplicatedToAllIndex(3);
1376 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1378 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1379 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1381 // Now send a CaptureSnapshotReply
1382 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1384 // Trimming log in this scenario is a no-op
1385 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1386 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1387 assertEquals(3, leader.getReplicatedToAllIndex());
1392 private ByteString fromObject(Object snapshot) throws Exception {
1393 ByteArrayOutputStream b = null;
1394 ObjectOutputStream o = null;
1396 b = new ByteArrayOutputStream();
1397 o = new ObjectOutputStream(b);
1398 o.writeObject(snapshot);
1399 byte[] snapshotBytes = b.toByteArray();
1400 return ByteString.copyFrom(snapshotBytes);