1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import org.junit.After;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.opendaylight.controller.cluster.DataPersistenceProvider;
58 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
59 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
60 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
61 import org.opendaylight.controller.cluster.notifications.RoleChanged;
62 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
67 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
69 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
70 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
71 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
73 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
75 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.Await;
81 import scala.concurrent.Future;
82 import scala.concurrent.duration.Duration;
83 import scala.concurrent.duration.FiniteDuration;
85 public class RaftActorTest extends AbstractActorTest {
87 private TestActorFactory factory;
91 factory = new TestActorFactory(getSystem());
95 public void tearDown() throws Exception {
97 InMemoryJournal.clear();
98 InMemorySnapshotStore.clear();
101 public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
103 private final RaftActor actorDelegate;
104 private final RaftActorRecoveryCohort cohortDelegate;
105 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
106 private final List<Object> state;
107 private ActorRef roleChangeNotifier;
108 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
110 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
111 private static final long serialVersionUID = 1L;
112 private final Map<String, String> peerAddresses;
113 private final String id;
114 private final Optional<ConfigParams> config;
115 private final DataPersistenceProvider dataPersistenceProvider;
116 private final ActorRef roleChangeNotifier;
118 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
119 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
120 ActorRef roleChangeNotifier) {
121 this.peerAddresses = peerAddresses;
123 this.config = config;
124 this.dataPersistenceProvider = dataPersistenceProvider;
125 this.roleChangeNotifier = roleChangeNotifier;
129 public MockRaftActor create() throws Exception {
130 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
131 dataPersistenceProvider);
132 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
133 return mockRaftActor;
137 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
138 DataPersistenceProvider dataPersistenceProvider) {
139 super(id, peerAddresses, config);
140 state = new ArrayList<>();
141 this.actorDelegate = mock(RaftActor.class);
142 this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
143 if(dataPersistenceProvider == null){
144 setPersistence(true);
146 setPersistence(dataPersistenceProvider);
150 public void waitForRecoveryComplete() {
152 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
153 } catch (InterruptedException e) {
158 public void waitForInitializeBehaviorComplete() {
160 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
161 } catch (InterruptedException e) {
167 public void waitUntilLeader(){
168 for(int i = 0;i < 10; i++){
172 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
176 public List<Object> getState() {
180 public static Props props(final String id, final Map<String, String> peerAddresses,
181 Optional<ConfigParams> config){
182 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
185 public static Props props(final String id, final Map<String, String> peerAddresses,
186 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
187 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
190 public static Props props(final String id, final Map<String, String> peerAddresses,
191 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
192 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
195 public static Props props(final String id, final Map<String, String> peerAddresses,
196 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
197 DataPersistenceProvider dataPersistenceProvider){
198 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
202 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
203 actorDelegate.applyState(clientActor, identifier, data);
204 LOG.info("{}: applyState called", persistenceId());
209 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
214 public void startLogRecoveryBatch(int maxBatchSize) {
218 public void appendRecoveredLogEntry(Payload data) {
223 public void applyCurrentLogRecoveryBatch() {
227 protected void onRecoveryComplete() {
228 actorDelegate.onRecoveryComplete();
229 recoveryComplete.countDown();
233 protected void initializeBehavior() {
234 super.initializeBehavior();
235 initializeBehaviorComplete.countDown();
239 public void applyRecoverySnapshot(byte[] bytes) {
240 cohortDelegate.applyRecoverySnapshot(bytes);
242 Object data = toObject(bytes);
243 if (data instanceof List) {
244 state.addAll((List<?>) data);
246 } catch (Exception e) {
251 @Override protected void createSnapshot() {
252 LOG.info("{}: createSnapshot called", persistenceId());
253 actorDelegate.createSnapshot();
256 @Override protected void applySnapshot(byte [] snapshot) {
257 LOG.info("{}: applySnapshot called", persistenceId());
258 actorDelegate.applySnapshot(snapshot);
261 @Override protected void onStateChanged() {
262 actorDelegate.onStateChanged();
266 protected Optional<ActorRef> getRoleChangeNotifier() {
267 return Optional.fromNullable(roleChangeNotifier);
270 @Override public String persistenceId() {
274 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
276 ByteArrayInputStream bis = null;
277 ObjectInputStream ois = null;
279 bis = new ByteArrayInputStream(bs);
280 ois = new ObjectInputStream(bis);
281 obj = ois.readObject();
293 public ReplicatedLog getReplicatedLog(){
294 return this.getRaftActorContext().getReplicatedLog();
300 public static class RaftActorTestKit extends JavaTestKit {
301 private final ActorRef raftActor;
303 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
306 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
307 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
312 public ActorRef getRaftActor() {
316 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
317 // Wait for a specific log message to show up
319 new JavaTestKit.EventFilter<Boolean>(logEventClass
322 protected Boolean run() {
325 }.from(raftActor.path().toString())
327 .occurrences(1).exec();
332 protected void waitUntilLeader(){
333 waitUntilLeader(raftActor);
336 public static void waitUntilLeader(ActorRef actorRef) {
337 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
338 for(int i = 0; i < 20 * 5; i++) {
339 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
341 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
342 if(resp.getLeaderActor() != null) {
345 } catch(TimeoutException e) {
346 } catch(Exception e) {
347 System.err.println("FindLeader threw ex");
352 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
355 Assert.fail("Leader not found for actorRef " + actorRef.path());
362 public void testConstruction() {
363 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
367 public void testFindLeaderWhenLeaderIsSelf(){
368 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
369 kit.waitUntilLeader();
373 public void testRaftActorRecovery() throws Exception {
374 new JavaTestKit(getSystem()) {{
375 String persistenceId = factory.generateActorId("follower-");
377 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
378 // Set the heartbeat interval high to essentially disable election otherwise the test
379 // may fail if the actor is switched to Leader and the commitIndex is set to the last
381 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
383 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
384 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
386 watch(followerActor);
388 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
389 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
390 new MockRaftActorContext.MockPayload("E"));
391 snapshotUnappliedEntries.add(entry1);
393 int lastAppliedDuringSnapshotCapture = 3;
394 int lastIndexDuringSnapshotCapture = 4;
396 // 4 messages as part of snapshot, which are applied to state
397 ByteString snapshotBytes = fromObject(Arrays.asList(
398 new MockRaftActorContext.MockPayload("A"),
399 new MockRaftActorContext.MockPayload("B"),
400 new MockRaftActorContext.MockPayload("C"),
401 new MockRaftActorContext.MockPayload("D")));
403 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
404 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
405 lastAppliedDuringSnapshotCapture, 1);
406 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
408 // add more entries after snapshot is taken
409 List<ReplicatedLogEntry> entries = new ArrayList<>();
410 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
411 new MockRaftActorContext.MockPayload("F"));
412 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
413 new MockRaftActorContext.MockPayload("G"));
414 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
415 new MockRaftActorContext.MockPayload("H"));
420 int lastAppliedToState = 5;
423 InMemoryJournal.addEntry(persistenceId, 5, entry2);
424 // 2 entries are applied to state besides the 4 entries in snapshot
425 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
426 InMemoryJournal.addEntry(persistenceId, 7, entry3);
427 InMemoryJournal.addEntry(persistenceId, 8, entry4);
430 followerActor.tell(PoisonPill.getInstance(), null);
431 expectMsgClass(duration("5 seconds"), Terminated.class);
433 unwatch(followerActor);
435 //reinstate the actor
436 TestActorRef<MockRaftActor> ref = factory.createTestActor(
437 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
438 Optional.<ConfigParams>of(config)));
440 ref.underlyingActor().waitForRecoveryComplete();
442 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
443 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
444 context.getReplicatedLog().size());
445 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
446 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
447 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
448 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
453 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
454 new JavaTestKit(getSystem()) {{
455 String persistenceId = factory.generateActorId("leader-");
457 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
458 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
460 // Setup the persisted journal with some entries
461 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
462 new MockRaftActorContext.MockPayload("zero"));
463 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
464 new MockRaftActorContext.MockPayload("oen"));
465 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
466 new MockRaftActorContext.MockPayload("two"));
469 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
470 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
471 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
472 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
474 int lastAppliedToState = 1;
477 //reinstate the actor
478 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
479 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
480 Optional.<ConfigParams>of(config)));
482 leaderActor.underlyingActor().waitForRecoveryComplete();
484 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
485 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
486 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
487 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
488 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
493 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
494 * process recovery messages
500 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
501 new JavaTestKit(getSystem()) {
503 String persistenceId = factory.generateActorId("leader-");
505 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
507 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
509 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
510 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
512 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
514 // Wait for akka's recovery to complete so it doesn't interfere.
515 mockRaftActor.waitForRecoveryComplete();
517 ByteString snapshotBytes = fromObject(Arrays.asList(
518 new MockRaftActorContext.MockPayload("A"),
519 new MockRaftActorContext.MockPayload("B"),
520 new MockRaftActorContext.MockPayload("C"),
521 new MockRaftActorContext.MockPayload("D")));
523 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
524 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
526 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
528 verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
530 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
532 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
534 assertEquals("add replicated log entry", 1, replicatedLog.size());
536 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
538 assertEquals("add replicated log entry", 2, replicatedLog.size());
540 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
542 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
544 // The snapshot had 4 items + we added 2 more items during the test
545 // We start removing from 5 and we should get 1 item in the replicated log
546 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
548 assertEquals("remove log entries", 1, replicatedLog.size());
550 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
552 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
553 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
555 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
561 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
562 * not process recovery messages
567 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
568 new JavaTestKit(getSystem()) {
570 String persistenceId = factory.generateActorId("leader-");
572 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
574 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
576 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
577 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
579 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
581 // Wait for akka's recovery to complete so it doesn't interfere.
582 mockRaftActor.waitForRecoveryComplete();
584 ByteString snapshotBytes = fromObject(Arrays.asList(
585 new MockRaftActorContext.MockPayload("A"),
586 new MockRaftActorContext.MockPayload("B"),
587 new MockRaftActorContext.MockPayload("C"),
588 new MockRaftActorContext.MockPayload("D")));
590 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
591 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
593 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
595 verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
597 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
599 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
601 assertEquals("add replicated log entry", 0, replicatedLog.size());
603 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
605 assertEquals("add replicated log entry", 0, replicatedLog.size());
607 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
609 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
611 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
613 assertEquals("remove log entries", 0, replicatedLog.size());
615 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
617 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
618 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
620 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
626 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
627 new JavaTestKit(getSystem()) {
629 String persistenceId = factory.generateActorId("leader-");
631 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
633 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
635 CountDownLatch persistLatch = new CountDownLatch(1);
636 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
637 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
639 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
640 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
642 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
644 mockRaftActor.waitForInitializeBehaviorComplete();
646 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
648 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
654 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
655 new JavaTestKit(getSystem()) {
657 String persistenceId = factory.generateActorId("leader-");
659 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
661 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
663 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
665 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
666 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
668 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
670 mockRaftActor.waitForInitializeBehaviorComplete();
672 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
674 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
676 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
682 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
683 new JavaTestKit(getSystem()) {
685 String persistenceId = factory.generateActorId("leader-");
687 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
689 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
691 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
693 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
694 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
696 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
698 mockRaftActor.waitForInitializeBehaviorComplete();
700 mockRaftActor.waitUntilLeader();
702 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
704 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
706 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
712 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
713 new JavaTestKit(getSystem()) {
715 String persistenceId = factory.generateActorId("leader-");
717 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
719 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
721 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
723 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
724 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
726 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
728 mockRaftActor.waitForInitializeBehaviorComplete();
730 mockRaftActor.waitUntilLeader();
732 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
734 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
742 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
743 new JavaTestKit(getSystem()) {
745 String persistenceId = factory.generateActorId("leader-");
747 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
749 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
751 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
753 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
754 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
755 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
757 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
759 mockRaftActor.waitForInitializeBehaviorComplete();
761 ByteString snapshotBytes = fromObject(Arrays.asList(
762 new MockRaftActorContext.MockPayload("A"),
763 new MockRaftActorContext.MockPayload("B"),
764 new MockRaftActorContext.MockPayload("C"),
765 new MockRaftActorContext.MockPayload("D")));
767 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
769 raftActorContext.getSnapshotManager().capture(
770 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
771 new MockRaftActorContext.MockPayload("D")), -1);
773 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
775 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
777 verify(dataPersistenceProvider).saveSnapshot(anyObject());
784 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
785 new JavaTestKit(getSystem()) {
787 String persistenceId = factory.generateActorId("leader-");
789 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
791 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
793 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
795 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
796 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
798 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
800 mockRaftActor.waitForInitializeBehaviorComplete();
801 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
803 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
804 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
805 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
806 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
807 mockRaftActor.getReplicatedLog().append(lastEntry);
809 ByteString snapshotBytes = fromObject(Arrays.asList(
810 new MockRaftActorContext.MockPayload("A"),
811 new MockRaftActorContext.MockPayload("B"),
812 new MockRaftActorContext.MockPayload("C"),
813 new MockRaftActorContext.MockPayload("D")));
815 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
816 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
818 long replicatedToAllIndex = 1;
820 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
822 verify(mockRaftActor.actorDelegate).createSnapshot();
824 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
826 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
828 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
830 verify(dataPersistenceProvider).deleteMessages(100);
832 assertEquals(3, mockRaftActor.getReplicatedLog().size());
833 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
835 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
836 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
837 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
839 // Index 2 will not be in the log because it was removed due to snapshotting
840 assertNull(mockRaftActor.getReplicatedLog().get(1));
841 assertNull(mockRaftActor.getReplicatedLog().get(0));
848 public void testApplyState() throws Exception {
850 new JavaTestKit(getSystem()) {
852 String persistenceId = factory.generateActorId("leader-");
854 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
856 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
858 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
860 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
861 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
863 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
865 mockRaftActor.waitForInitializeBehaviorComplete();
867 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
868 new MockRaftActorContext.MockPayload("F"));
870 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
872 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
879 public void testApplySnapshot() throws Exception {
880 new JavaTestKit(getSystem()) {
882 String persistenceId = factory.generateActorId("leader-");
884 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
886 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
888 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
890 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
891 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
893 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
895 mockRaftActor.waitForInitializeBehaviorComplete();
897 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
899 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
900 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
901 oldReplicatedLog.append(
902 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
903 mock(Payload.class)));
905 ByteString snapshotBytes = fromObject(Arrays.asList(
906 new MockRaftActorContext.MockPayload("A"),
907 new MockRaftActorContext.MockPayload("B"),
908 new MockRaftActorContext.MockPayload("C"),
909 new MockRaftActorContext.MockPayload("D")));
911 Snapshot snapshot = mock(Snapshot.class);
913 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
915 doReturn(3L).when(snapshot).getLastAppliedIndex();
917 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
919 verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
921 assertTrue("The replicatedLog should have changed",
922 oldReplicatedLog != mockRaftActor.getReplicatedLog());
924 assertEquals("lastApplied should be same as in the snapshot",
925 (Long) 3L, mockRaftActor.getLastApplied());
927 assertEquals(0, mockRaftActor.getReplicatedLog().size());
934 public void testSaveSnapshotFailure() throws Exception {
935 new JavaTestKit(getSystem()) {
937 String persistenceId = factory.generateActorId("leader-");
939 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
941 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
943 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
945 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
946 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
948 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
950 mockRaftActor.waitForInitializeBehaviorComplete();
952 ByteString snapshotBytes = fromObject(Arrays.asList(
953 new MockRaftActorContext.MockPayload("A"),
954 new MockRaftActorContext.MockPayload("B"),
955 new MockRaftActorContext.MockPayload("C"),
956 new MockRaftActorContext.MockPayload("D")));
958 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
960 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
962 raftActorContext.getSnapshotManager().capture(
963 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
964 new MockRaftActorContext.MockPayload("D")), 1);
966 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
968 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
971 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
972 mockRaftActor.getReplicatedLog().getSnapshotIndex());
979 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
980 new JavaTestKit(getSystem()) {{
981 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
982 Props.create(MessageCollectorActor.class));
983 MessageCollectorActor.waitUntilReady(notifierActor);
985 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
986 long heartBeatInterval = 100;
987 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
988 config.setElectionTimeoutFactor(20);
990 String persistenceId = factory.generateActorId("notifier-");
992 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
993 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
994 new NonPersistentDataProvider()), persistenceId);
996 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
999 // check if the notifier got a role change from null to Follower
1000 RoleChanged raftRoleChanged = matches.get(0);
1001 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1002 assertNull(raftRoleChanged.getOldRole());
1003 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1005 // check if the notifier got a role change from Follower to Candidate
1006 raftRoleChanged = matches.get(1);
1007 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1008 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1009 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1011 // check if the notifier got a role change from Candidate to Leader
1012 raftRoleChanged = matches.get(2);
1013 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1014 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1015 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1017 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1018 notifierActor, LeaderStateChanged.class);
1020 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1022 notifierActor.underlyingActor().clear();
1024 MockRaftActor raftActor = raftActorRef.underlyingActor();
1025 final String newLeaderId = "new-leader";
1026 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1028 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1029 leaderId = newLeaderId;
1034 raftActor.changeCurrentBehavior(follower);
1036 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1037 assertEquals(persistenceId, leaderStateChange.getMemberId());
1038 assertEquals(null, leaderStateChange.getLeaderId());
1040 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1041 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1042 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1044 notifierActor.underlyingActor().clear();
1046 raftActor.handleCommand("any");
1048 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1049 assertEquals(persistenceId, leaderStateChange.getMemberId());
1050 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1055 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1056 new JavaTestKit(getSystem()) {{
1057 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1058 MessageCollectorActor.waitUntilReady(notifierActor);
1060 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1061 long heartBeatInterval = 100;
1062 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1063 config.setElectionTimeoutFactor(1);
1065 String persistenceId = factory.generateActorId("notifier-");
1067 factory.createActor(MockRaftActor.props(persistenceId,
1068 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1070 List<RoleChanged> matches = null;
1071 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1072 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1073 assertNotNull(matches);
1074 if(matches.size() == 3) {
1077 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1080 assertEquals(2, matches.size());
1082 // check if the notifier got a role change from null to Follower
1083 RoleChanged raftRoleChanged = matches.get(0);
1084 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1085 assertNull(raftRoleChanged.getOldRole());
1086 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1088 // check if the notifier got a role change from Follower to Candidate
1089 raftRoleChanged = matches.get(1);
1090 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1091 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1092 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1098 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1099 new JavaTestKit(getSystem()) {
1101 String persistenceId = factory.generateActorId("leader-");
1102 String follower1Id = factory.generateActorId("follower-");
1104 ActorRef followerActor1 =
1105 factory.createActor(Props.create(MessageCollectorActor.class));
1107 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1108 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1109 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1111 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1113 Map<String, String> peerAddresses = new HashMap<>();
1114 peerAddresses.put(follower1Id, followerActor1.path().toString());
1116 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1117 MockRaftActor.props(persistenceId, peerAddresses,
1118 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1120 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1122 leaderActor.getRaftActorContext().setCommitIndex(4);
1123 leaderActor.getRaftActorContext().setLastApplied(4);
1124 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1126 leaderActor.waitForInitializeBehaviorComplete();
1128 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1130 Leader leader = new Leader(leaderActor.getRaftActorContext());
1131 leaderActor.setCurrentBehavior(leader);
1132 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1134 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1135 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1137 assertEquals(8, leaderActor.getReplicatedLog().size());
1139 leaderActor.getRaftActorContext().getSnapshotManager()
1140 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1141 new MockRaftActorContext.MockPayload("x")), 4);
1143 verify(leaderActor.actorDelegate).createSnapshot();
1145 assertEquals(8, leaderActor.getReplicatedLog().size());
1147 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1148 //fake snapshot on index 5
1149 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1151 assertEquals(8, leaderActor.getReplicatedLog().size());
1153 //fake snapshot on index 6
1154 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1155 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1156 assertEquals(8, leaderActor.getReplicatedLog().size());
1158 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1160 assertEquals(8, leaderActor.getReplicatedLog().size());
1162 ByteString snapshotBytes = fromObject(Arrays.asList(
1163 new MockRaftActorContext.MockPayload("foo-0"),
1164 new MockRaftActorContext.MockPayload("foo-1"),
1165 new MockRaftActorContext.MockPayload("foo-2"),
1166 new MockRaftActorContext.MockPayload("foo-3"),
1167 new MockRaftActorContext.MockPayload("foo-4")));
1169 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1170 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1172 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1174 // The commit is needed to complete the snapshot creation process
1175 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1177 // capture snapshot reply should remove the snapshotted entries only
1178 assertEquals(3, leaderActor.getReplicatedLog().size());
1179 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1181 // add another non-replicated entry
1182 leaderActor.getReplicatedLog().append(
1183 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1185 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1186 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1187 assertEquals(2, leaderActor.getReplicatedLog().size());
1188 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1195 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1196 new JavaTestKit(getSystem()) {
1198 String persistenceId = factory.generateActorId("follower-");
1199 String leaderId = factory.generateActorId("leader-");
1202 ActorRef leaderActor1 =
1203 factory.createActor(Props.create(MessageCollectorActor.class));
1205 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1206 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1207 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1209 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1211 Map<String, String> peerAddresses = new HashMap<>();
1212 peerAddresses.put(leaderId, leaderActor1.path().toString());
1214 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1215 MockRaftActor.props(persistenceId, peerAddresses,
1216 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1218 MockRaftActor followerActor = mockActorRef.underlyingActor();
1219 followerActor.getRaftActorContext().setCommitIndex(4);
1220 followerActor.getRaftActorContext().setLastApplied(4);
1221 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1223 followerActor.waitForInitializeBehaviorComplete();
1226 Follower follower = new Follower(followerActor.getRaftActorContext());
1227 followerActor.setCurrentBehavior(follower);
1228 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1230 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1231 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1232 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1234 // log has indices 0-5
1235 assertEquals(6, followerActor.getReplicatedLog().size());
1238 followerActor.getRaftActorContext().getSnapshotManager().capture(
1239 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1240 new MockRaftActorContext.MockPayload("D")), 4);
1242 verify(followerActor.actorDelegate).createSnapshot();
1244 assertEquals(6, followerActor.getReplicatedLog().size());
1246 //fake snapshot on index 6
1247 List<ReplicatedLogEntry> entries =
1249 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1250 new MockRaftActorContext.MockPayload("foo-6"))
1252 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1253 assertEquals(7, followerActor.getReplicatedLog().size());
1255 //fake snapshot on index 7
1256 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1260 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1261 new MockRaftActorContext.MockPayload("foo-7"))
1263 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1264 assertEquals(8, followerActor.getReplicatedLog().size());
1266 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1269 ByteString snapshotBytes = fromObject(Arrays.asList(
1270 new MockRaftActorContext.MockPayload("foo-0"),
1271 new MockRaftActorContext.MockPayload("foo-1"),
1272 new MockRaftActorContext.MockPayload("foo-2"),
1273 new MockRaftActorContext.MockPayload("foo-3"),
1274 new MockRaftActorContext.MockPayload("foo-4")));
1275 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1276 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1278 // The commit is needed to complete the snapshot creation process
1279 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1281 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1282 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1283 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1287 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1288 new MockRaftActorContext.MockPayload("foo-7"))
1290 // send an additional entry 8 with leaderCommit = 7
1291 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1293 // 7 and 8, as lastapplied is 7
1294 assertEquals(2, followerActor.getReplicatedLog().size());
1301 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1302 new JavaTestKit(getSystem()) {
1304 String persistenceId = factory.generateActorId("leader-");
1305 String follower1Id = factory.generateActorId("follower-");
1306 String follower2Id = factory.generateActorId("follower-");
1308 ActorRef followerActor1 =
1309 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1310 ActorRef followerActor2 =
1311 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1313 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1314 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1315 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1317 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1319 Map<String, String> peerAddresses = new HashMap<>();
1320 peerAddresses.put(follower1Id, followerActor1.path().toString());
1321 peerAddresses.put(follower2Id, followerActor2.path().toString());
1323 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1324 MockRaftActor.props(persistenceId, peerAddresses,
1325 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1327 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1328 leaderActor.getRaftActorContext().setCommitIndex(9);
1329 leaderActor.getRaftActorContext().setLastApplied(9);
1330 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1332 leaderActor.waitForInitializeBehaviorComplete();
1334 Leader leader = new Leader(leaderActor.getRaftActorContext());
1335 leaderActor.setCurrentBehavior(leader);
1336 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1338 // create 5 entries in the log
1339 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1340 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1342 //set the snapshot index to 4 , 0 to 4 are snapshotted
1343 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1344 //setting replicatedToAllIndex = 9, for the log to clear
1345 leader.setReplicatedToAllIndex(9);
1346 assertEquals(5, leaderActor.getReplicatedLog().size());
1347 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1349 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1350 assertEquals(5, leaderActor.getReplicatedLog().size());
1351 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1353 // set the 2nd follower nextIndex to 1 which has been snapshotted
1354 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1355 assertEquals(5, leaderActor.getReplicatedLog().size());
1356 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1358 // simulate a real snapshot
1359 leaderActor.onReceiveCommand(new SendHeartBeat());
1360 assertEquals(5, leaderActor.getReplicatedLog().size());
1361 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1362 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1363 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1366 //reply from a slow follower does not initiate a fake snapshot
1367 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1368 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1370 ByteString snapshotBytes = fromObject(Arrays.asList(
1371 new MockRaftActorContext.MockPayload("foo-0"),
1372 new MockRaftActorContext.MockPayload("foo-1"),
1373 new MockRaftActorContext.MockPayload("foo-2"),
1374 new MockRaftActorContext.MockPayload("foo-3"),
1375 new MockRaftActorContext.MockPayload("foo-4")));
1376 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1377 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1379 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1381 //reply from a slow follower after should not raise errors
1382 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1383 assertEquals(0, leaderActor.getReplicatedLog().size());
1389 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1390 new JavaTestKit(getSystem()) {{
1391 String persistenceId = factory.generateActorId("leader-");
1392 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1393 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1394 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1395 config.setSnapshotBatchCount(5);
1397 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1399 Map<String, String> peerAddresses = new HashMap<>();
1401 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1402 MockRaftActor.props(persistenceId, peerAddresses,
1403 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1405 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1406 leaderActor.getRaftActorContext().setCommitIndex(3);
1407 leaderActor.getRaftActorContext().setLastApplied(3);
1408 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1410 leaderActor.waitForInitializeBehaviorComplete();
1411 for(int i=0;i< 4;i++) {
1412 leaderActor.getReplicatedLog()
1413 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1414 new MockRaftActorContext.MockPayload("A")));
1417 Leader leader = new Leader(leaderActor.getRaftActorContext());
1418 leaderActor.setCurrentBehavior(leader);
1419 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1421 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1422 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1424 // Now send a CaptureSnapshotReply
1425 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1427 // Trimming log in this scenario is a no-op
1428 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1429 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1430 assertEquals(-1, leader.getReplicatedToAllIndex());
1436 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1437 new JavaTestKit(getSystem()) {{
1438 String persistenceId = factory.generateActorId("leader-");
1439 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1440 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1441 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1442 config.setSnapshotBatchCount(5);
1444 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1446 Map<String, String> peerAddresses = new HashMap<>();
1448 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1449 MockRaftActor.props(persistenceId, peerAddresses,
1450 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1452 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1453 leaderActor.getRaftActorContext().setCommitIndex(3);
1454 leaderActor.getRaftActorContext().setLastApplied(3);
1455 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1456 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1458 leaderActor.waitForInitializeBehaviorComplete();
1459 Leader leader = new Leader(leaderActor.getRaftActorContext());
1460 leaderActor.setCurrentBehavior(leader);
1461 leader.setReplicatedToAllIndex(3);
1462 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1464 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1465 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1467 // Now send a CaptureSnapshotReply
1468 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1470 // Trimming log in this scenario is a no-op
1471 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1472 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1473 assertEquals(3, leader.getReplicatedToAllIndex());
1478 private ByteString fromObject(Object snapshot) throws Exception {
1479 ByteArrayOutputStream b = null;
1480 ObjectOutputStream o = null;
1482 b = new ByteArrayOutputStream();
1483 o = new ObjectOutputStream(b);
1484 o.writeObject(snapshot);
1485 byte[] snapshotBytes = b.toByteArray();
1486 return ByteString.copyFrom(snapshotBytes);