1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
58 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RoleChanged;
61 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
66 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
67 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
68 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
69 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
70 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
75 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
76 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
77 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
78 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.Duration;
82 import scala.concurrent.duration.FiniteDuration;
84 public class RaftActorTest extends AbstractActorTest {
86 private TestActorFactory factory;
90 factory = new TestActorFactory(getSystem());
94 public void tearDown() throws Exception {
96 InMemoryJournal.clear();
97 InMemorySnapshotStore.clear();
100 public static class MockRaftActor extends RaftActor {
102 private final RaftActor delegate;
103 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
104 private final List<Object> state;
105 private ActorRef roleChangeNotifier;
106 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
108 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
109 private static final long serialVersionUID = 1L;
110 private final Map<String, String> peerAddresses;
111 private final String id;
112 private final Optional<ConfigParams> config;
113 private final DataPersistenceProvider dataPersistenceProvider;
114 private final ActorRef roleChangeNotifier;
116 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
117 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
118 ActorRef roleChangeNotifier) {
119 this.peerAddresses = peerAddresses;
121 this.config = config;
122 this.dataPersistenceProvider = dataPersistenceProvider;
123 this.roleChangeNotifier = roleChangeNotifier;
127 public MockRaftActor create() throws Exception {
128 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
129 dataPersistenceProvider);
130 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
131 return mockRaftActor;
135 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
136 DataPersistenceProvider dataPersistenceProvider) {
137 super(id, peerAddresses, config);
138 state = new ArrayList<>();
139 this.delegate = mock(RaftActor.class);
140 if(dataPersistenceProvider == null){
141 setPersistence(true);
143 setPersistence(dataPersistenceProvider);
147 public void waitForRecoveryComplete() {
149 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
150 } catch (InterruptedException e) {
155 public void waitForInitializeBehaviorComplete() {
157 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
158 } catch (InterruptedException e) {
164 public void waitUntilLeader(){
165 for(int i = 0;i < 10; i++){
169 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
173 public List<Object> getState() {
177 public static Props props(final String id, final Map<String, String> peerAddresses,
178 Optional<ConfigParams> config){
179 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
182 public static Props props(final String id, final Map<String, String> peerAddresses,
183 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
184 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
187 public static Props props(final String id, final Map<String, String> peerAddresses,
188 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
189 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
192 public static Props props(final String id, final Map<String, String> peerAddresses,
193 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
194 DataPersistenceProvider dataPersistenceProvider){
195 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
199 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
200 delegate.applyState(clientActor, identifier, data);
201 LOG.info("{}: applyState called", persistenceId());
205 protected void startLogRecoveryBatch(int maxBatchSize) {
209 protected void appendRecoveredLogEntry(Payload data) {
214 protected void applyCurrentLogRecoveryBatch() {
218 protected void onRecoveryComplete() {
219 delegate.onRecoveryComplete();
220 recoveryComplete.countDown();
224 protected void initializeBehavior() {
225 super.initializeBehavior();
226 initializeBehaviorComplete.countDown();
230 protected void applyRecoverySnapshot(byte[] bytes) {
231 delegate.applyRecoverySnapshot(bytes);
233 Object data = toObject(bytes);
234 if (data instanceof List) {
235 state.addAll((List<?>) data);
237 } catch (Exception e) {
242 @Override protected void createSnapshot() {
243 LOG.info("{}: createSnapshot called", persistenceId());
244 delegate.createSnapshot();
247 @Override protected void applySnapshot(byte [] snapshot) {
248 LOG.info("{}: applySnapshot called", persistenceId());
249 delegate.applySnapshot(snapshot);
252 @Override protected void onStateChanged() {
253 delegate.onStateChanged();
257 protected Optional<ActorRef> getRoleChangeNotifier() {
258 return Optional.fromNullable(roleChangeNotifier);
261 @Override public String persistenceId() {
265 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
267 ByteArrayInputStream bis = null;
268 ObjectInputStream ois = null;
270 bis = new ByteArrayInputStream(bs);
271 ois = new ObjectInputStream(bis);
272 obj = ois.readObject();
284 public ReplicatedLog getReplicatedLog(){
285 return this.getRaftActorContext().getReplicatedLog();
291 public static class RaftActorTestKit extends JavaTestKit {
292 private final ActorRef raftActor;
294 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
297 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
298 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
303 public ActorRef getRaftActor() {
307 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
308 // Wait for a specific log message to show up
310 new JavaTestKit.EventFilter<Boolean>(logEventClass
313 protected Boolean run() {
316 }.from(raftActor.path().toString())
318 .occurrences(1).exec();
323 protected void waitUntilLeader(){
324 waitUntilLeader(raftActor);
327 public static void waitUntilLeader(ActorRef actorRef) {
328 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
329 for(int i = 0; i < 20 * 5; i++) {
330 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
332 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
333 if(resp.getLeaderActor() != null) {
336 } catch(TimeoutException e) {
337 } catch(Exception e) {
338 System.err.println("FindLeader threw ex");
343 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
346 Assert.fail("Leader not found for actorRef " + actorRef.path());
353 public void testConstruction() {
354 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
358 public void testFindLeaderWhenLeaderIsSelf(){
359 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
360 kit.waitUntilLeader();
364 public void testRaftActorRecovery() throws Exception {
365 new JavaTestKit(getSystem()) {{
366 String persistenceId = factory.generateActorId("follower-");
368 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
369 // Set the heartbeat interval high to essentially disable election otherwise the test
370 // may fail if the actor is switched to Leader and the commitIndex is set to the last
372 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
374 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
375 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
377 watch(followerActor);
379 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
380 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
381 new MockRaftActorContext.MockPayload("E"));
382 snapshotUnappliedEntries.add(entry1);
384 int lastAppliedDuringSnapshotCapture = 3;
385 int lastIndexDuringSnapshotCapture = 4;
387 // 4 messages as part of snapshot, which are applied to state
388 ByteString snapshotBytes = fromObject(Arrays.asList(
389 new MockRaftActorContext.MockPayload("A"),
390 new MockRaftActorContext.MockPayload("B"),
391 new MockRaftActorContext.MockPayload("C"),
392 new MockRaftActorContext.MockPayload("D")));
394 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
395 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
396 lastAppliedDuringSnapshotCapture, 1);
397 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
399 // add more entries after snapshot is taken
400 List<ReplicatedLogEntry> entries = new ArrayList<>();
401 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
402 new MockRaftActorContext.MockPayload("F"));
403 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
404 new MockRaftActorContext.MockPayload("G"));
405 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
406 new MockRaftActorContext.MockPayload("H"));
411 int lastAppliedToState = 5;
414 InMemoryJournal.addEntry(persistenceId, 5, entry2);
415 // 2 entries are applied to state besides the 4 entries in snapshot
416 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
417 InMemoryJournal.addEntry(persistenceId, 7, entry3);
418 InMemoryJournal.addEntry(persistenceId, 8, entry4);
421 followerActor.tell(PoisonPill.getInstance(), null);
422 expectMsgClass(duration("5 seconds"), Terminated.class);
424 unwatch(followerActor);
426 //reinstate the actor
427 TestActorRef<MockRaftActor> ref = factory.createTestActor(
428 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
429 Optional.<ConfigParams>of(config)));
431 ref.underlyingActor().waitForRecoveryComplete();
433 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
434 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
435 context.getReplicatedLog().size());
436 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
437 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
438 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
439 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
444 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
445 new JavaTestKit(getSystem()) {{
446 String persistenceId = factory.generateActorId("leader-");
448 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
449 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
451 // Setup the persisted journal with some entries
452 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
453 new MockRaftActorContext.MockPayload("zero"));
454 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
455 new MockRaftActorContext.MockPayload("oen"));
456 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
457 new MockRaftActorContext.MockPayload("two"));
460 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
461 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
462 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
463 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
465 int lastAppliedToState = 1;
468 //reinstate the actor
469 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
470 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
471 Optional.<ConfigParams>of(config)));
473 leaderActor.underlyingActor().waitForRecoveryComplete();
475 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
476 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
477 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
478 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
479 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
484 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
485 * process recovery messages
491 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
492 new JavaTestKit(getSystem()) {
494 String persistenceId = factory.generateActorId("leader-");
496 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
498 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
500 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
501 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
503 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
505 // Wait for akka's recovery to complete so it doesn't interfere.
506 mockRaftActor.waitForRecoveryComplete();
508 ByteString snapshotBytes = fromObject(Arrays.asList(
509 new MockRaftActorContext.MockPayload("A"),
510 new MockRaftActorContext.MockPayload("B"),
511 new MockRaftActorContext.MockPayload("C"),
512 new MockRaftActorContext.MockPayload("D")));
514 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
515 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
517 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
519 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
521 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
523 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
525 assertEquals("add replicated log entry", 1, replicatedLog.size());
527 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
529 assertEquals("add replicated log entry", 2, replicatedLog.size());
531 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
533 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
535 // The snapshot had 4 items + we added 2 more items during the test
536 // We start removing from 5 and we should get 1 item in the replicated log
537 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
539 assertEquals("remove log entries", 1, replicatedLog.size());
541 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
543 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
544 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
546 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
552 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
553 * not process recovery messages
558 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
559 new JavaTestKit(getSystem()) {
561 String persistenceId = factory.generateActorId("leader-");
563 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
565 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
567 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
568 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
570 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
572 // Wait for akka's recovery to complete so it doesn't interfere.
573 mockRaftActor.waitForRecoveryComplete();
575 ByteString snapshotBytes = fromObject(Arrays.asList(
576 new MockRaftActorContext.MockPayload("A"),
577 new MockRaftActorContext.MockPayload("B"),
578 new MockRaftActorContext.MockPayload("C"),
579 new MockRaftActorContext.MockPayload("D")));
581 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
582 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
584 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
586 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
588 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
590 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
592 assertEquals("add replicated log entry", 0, replicatedLog.size());
594 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
596 assertEquals("add replicated log entry", 0, replicatedLog.size());
598 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
600 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
602 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
604 assertEquals("remove log entries", 0, replicatedLog.size());
606 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
608 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
609 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
611 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
617 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
618 new JavaTestKit(getSystem()) {
620 String persistenceId = factory.generateActorId("leader-");
622 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
624 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
626 CountDownLatch persistLatch = new CountDownLatch(1);
627 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
628 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
630 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
631 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
633 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
635 mockRaftActor.waitForInitializeBehaviorComplete();
637 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
639 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
645 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
646 new JavaTestKit(getSystem()) {
648 String persistenceId = factory.generateActorId("leader-");
650 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
652 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
654 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
656 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
657 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
659 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
661 mockRaftActor.waitForInitializeBehaviorComplete();
663 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
665 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
667 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
673 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
674 new JavaTestKit(getSystem()) {
676 String persistenceId = factory.generateActorId("leader-");
678 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
680 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
682 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
684 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
685 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
687 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
689 mockRaftActor.waitForInitializeBehaviorComplete();
691 mockRaftActor.waitUntilLeader();
693 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
695 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
697 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
703 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
704 new JavaTestKit(getSystem()) {
706 String persistenceId = factory.generateActorId("leader-");
708 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
710 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
712 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
714 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
715 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
717 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
719 mockRaftActor.waitForInitializeBehaviorComplete();
721 mockRaftActor.waitUntilLeader();
723 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
725 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
733 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
734 new JavaTestKit(getSystem()) {
736 String persistenceId = factory.generateActorId("leader-");
738 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
740 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
742 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
744 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
745 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
746 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
748 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
750 mockRaftActor.waitForInitializeBehaviorComplete();
752 ByteString snapshotBytes = fromObject(Arrays.asList(
753 new MockRaftActorContext.MockPayload("A"),
754 new MockRaftActorContext.MockPayload("B"),
755 new MockRaftActorContext.MockPayload("C"),
756 new MockRaftActorContext.MockPayload("D")));
758 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
760 raftActorContext.getSnapshotManager().capture(
761 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
762 new MockRaftActorContext.MockPayload("D")), -1);
764 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
766 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
768 verify(dataPersistenceProvider).saveSnapshot(anyObject());
775 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
776 new JavaTestKit(getSystem()) {
778 String persistenceId = factory.generateActorId("leader-");
780 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
782 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
784 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
786 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
787 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
789 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
791 mockRaftActor.waitForInitializeBehaviorComplete();
792 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
794 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
795 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
796 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
797 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
798 mockRaftActor.getReplicatedLog().append(lastEntry);
800 ByteString snapshotBytes = fromObject(Arrays.asList(
801 new MockRaftActorContext.MockPayload("A"),
802 new MockRaftActorContext.MockPayload("B"),
803 new MockRaftActorContext.MockPayload("C"),
804 new MockRaftActorContext.MockPayload("D")));
806 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
807 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
809 long replicatedToAllIndex = 1;
811 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
813 verify(mockRaftActor.delegate).createSnapshot();
815 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
817 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
819 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
821 verify(dataPersistenceProvider).deleteMessages(100);
823 assertEquals(3, mockRaftActor.getReplicatedLog().size());
824 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
826 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
827 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
828 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
830 // Index 2 will not be in the log because it was removed due to snapshotting
831 assertNull(mockRaftActor.getReplicatedLog().get(1));
832 assertNull(mockRaftActor.getReplicatedLog().get(0));
839 public void testApplyState() throws Exception {
841 new JavaTestKit(getSystem()) {
843 String persistenceId = factory.generateActorId("leader-");
845 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
847 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
849 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
851 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
852 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
854 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
856 mockRaftActor.waitForInitializeBehaviorComplete();
858 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
859 new MockRaftActorContext.MockPayload("F"));
861 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
863 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
870 public void testApplySnapshot() throws Exception {
871 new JavaTestKit(getSystem()) {
873 String persistenceId = factory.generateActorId("leader-");
875 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
877 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
879 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
881 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
882 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
884 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
886 mockRaftActor.waitForInitializeBehaviorComplete();
888 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
890 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
891 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
892 oldReplicatedLog.append(
893 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
894 mock(Payload.class)));
896 ByteString snapshotBytes = fromObject(Arrays.asList(
897 new MockRaftActorContext.MockPayload("A"),
898 new MockRaftActorContext.MockPayload("B"),
899 new MockRaftActorContext.MockPayload("C"),
900 new MockRaftActorContext.MockPayload("D")));
902 Snapshot snapshot = mock(Snapshot.class);
904 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
906 doReturn(3L).when(snapshot).getLastAppliedIndex();
908 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
910 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
912 assertTrue("The replicatedLog should have changed",
913 oldReplicatedLog != mockRaftActor.getReplicatedLog());
915 assertEquals("lastApplied should be same as in the snapshot",
916 (Long) 3L, mockRaftActor.getLastApplied());
918 assertEquals(0, mockRaftActor.getReplicatedLog().size());
925 public void testSaveSnapshotFailure() throws Exception {
926 new JavaTestKit(getSystem()) {
928 String persistenceId = factory.generateActorId("leader-");
930 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
932 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
934 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
936 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
937 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
939 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
941 mockRaftActor.waitForInitializeBehaviorComplete();
943 ByteString snapshotBytes = fromObject(Arrays.asList(
944 new MockRaftActorContext.MockPayload("A"),
945 new MockRaftActorContext.MockPayload("B"),
946 new MockRaftActorContext.MockPayload("C"),
947 new MockRaftActorContext.MockPayload("D")));
949 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
951 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
953 raftActorContext.getSnapshotManager().capture(
954 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
955 new MockRaftActorContext.MockPayload("D")), 1);
957 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
959 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
962 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
963 mockRaftActor.getReplicatedLog().getSnapshotIndex());
970 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
971 new JavaTestKit(getSystem()) {{
972 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
973 Props.create(MessageCollectorActor.class));
974 MessageCollectorActor.waitUntilReady(notifierActor);
976 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
977 long heartBeatInterval = 100;
978 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
979 config.setElectionTimeoutFactor(20);
981 String persistenceId = factory.generateActorId("notifier-");
983 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
984 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
985 new NonPersistentDataProvider()), persistenceId);
987 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
990 // check if the notifier got a role change from null to Follower
991 RoleChanged raftRoleChanged = matches.get(0);
992 assertEquals(persistenceId, raftRoleChanged.getMemberId());
993 assertNull(raftRoleChanged.getOldRole());
994 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
996 // check if the notifier got a role change from Follower to Candidate
997 raftRoleChanged = matches.get(1);
998 assertEquals(persistenceId, raftRoleChanged.getMemberId());
999 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1000 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1002 // check if the notifier got a role change from Candidate to Leader
1003 raftRoleChanged = matches.get(2);
1004 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1005 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1006 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1008 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1009 notifierActor, LeaderStateChanged.class);
1011 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1013 notifierActor.underlyingActor().clear();
1015 MockRaftActor raftActor = raftActorRef.underlyingActor();
1016 final String newLeaderId = "new-leader";
1017 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1019 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1020 leaderId = newLeaderId;
1025 raftActor.changeCurrentBehavior(follower);
1027 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1028 assertEquals(persistenceId, leaderStateChange.getMemberId());
1029 assertEquals(null, leaderStateChange.getLeaderId());
1031 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1032 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1033 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1035 notifierActor.underlyingActor().clear();
1037 raftActor.handleCommand("any");
1039 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1040 assertEquals(persistenceId, leaderStateChange.getMemberId());
1041 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1046 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1047 new JavaTestKit(getSystem()) {{
1048 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1049 MessageCollectorActor.waitUntilReady(notifierActor);
1051 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1052 long heartBeatInterval = 100;
1053 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1054 config.setElectionTimeoutFactor(1);
1056 String persistenceId = factory.generateActorId("notifier-");
1058 factory.createActor(MockRaftActor.props(persistenceId,
1059 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1061 List<RoleChanged> matches = null;
1062 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1063 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1064 assertNotNull(matches);
1065 if(matches.size() == 3) {
1068 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1071 assertEquals(2, matches.size());
1073 // check if the notifier got a role change from null to Follower
1074 RoleChanged raftRoleChanged = matches.get(0);
1075 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1076 assertNull(raftRoleChanged.getOldRole());
1077 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1079 // check if the notifier got a role change from Follower to Candidate
1080 raftRoleChanged = matches.get(1);
1081 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1082 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1083 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1089 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1090 new JavaTestKit(getSystem()) {
1092 String persistenceId = factory.generateActorId("leader-");
1093 String follower1Id = factory.generateActorId("follower-");
1095 ActorRef followerActor1 =
1096 factory.createActor(Props.create(MessageCollectorActor.class));
1098 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1099 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1100 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1102 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1104 Map<String, String> peerAddresses = new HashMap<>();
1105 peerAddresses.put(follower1Id, followerActor1.path().toString());
1107 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1108 MockRaftActor.props(persistenceId, peerAddresses,
1109 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1111 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1113 leaderActor.getRaftActorContext().setCommitIndex(4);
1114 leaderActor.getRaftActorContext().setLastApplied(4);
1115 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1117 leaderActor.waitForInitializeBehaviorComplete();
1119 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1121 Leader leader = new Leader(leaderActor.getRaftActorContext());
1122 leaderActor.setCurrentBehavior(leader);
1123 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1125 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1126 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1128 assertEquals(8, leaderActor.getReplicatedLog().size());
1130 leaderActor.getRaftActorContext().getSnapshotManager()
1131 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1132 new MockRaftActorContext.MockPayload("x")), 4);
1134 verify(leaderActor.delegate).createSnapshot();
1136 assertEquals(8, leaderActor.getReplicatedLog().size());
1138 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1139 //fake snapshot on index 5
1140 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1142 assertEquals(8, leaderActor.getReplicatedLog().size());
1144 //fake snapshot on index 6
1145 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1146 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1147 assertEquals(8, leaderActor.getReplicatedLog().size());
1149 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1151 assertEquals(8, leaderActor.getReplicatedLog().size());
1153 ByteString snapshotBytes = fromObject(Arrays.asList(
1154 new MockRaftActorContext.MockPayload("foo-0"),
1155 new MockRaftActorContext.MockPayload("foo-1"),
1156 new MockRaftActorContext.MockPayload("foo-2"),
1157 new MockRaftActorContext.MockPayload("foo-3"),
1158 new MockRaftActorContext.MockPayload("foo-4")));
1160 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1161 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1163 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1165 // The commit is needed to complete the snapshot creation process
1166 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1168 // capture snapshot reply should remove the snapshotted entries only
1169 assertEquals(3, leaderActor.getReplicatedLog().size());
1170 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1172 // add another non-replicated entry
1173 leaderActor.getReplicatedLog().append(
1174 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1176 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1177 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1178 assertEquals(2, leaderActor.getReplicatedLog().size());
1179 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1186 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1187 new JavaTestKit(getSystem()) {
1189 String persistenceId = factory.generateActorId("follower-");
1190 String leaderId = factory.generateActorId("leader-");
1193 ActorRef leaderActor1 =
1194 factory.createActor(Props.create(MessageCollectorActor.class));
1196 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1197 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1198 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1200 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1202 Map<String, String> peerAddresses = new HashMap<>();
1203 peerAddresses.put(leaderId, leaderActor1.path().toString());
1205 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1206 MockRaftActor.props(persistenceId, peerAddresses,
1207 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1209 MockRaftActor followerActor = mockActorRef.underlyingActor();
1210 followerActor.getRaftActorContext().setCommitIndex(4);
1211 followerActor.getRaftActorContext().setLastApplied(4);
1212 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1214 followerActor.waitForInitializeBehaviorComplete();
1217 Follower follower = new Follower(followerActor.getRaftActorContext());
1218 followerActor.setCurrentBehavior(follower);
1219 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1221 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1222 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1223 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1225 // log has indices 0-5
1226 assertEquals(6, followerActor.getReplicatedLog().size());
1229 followerActor.getRaftActorContext().getSnapshotManager().capture(
1230 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1231 new MockRaftActorContext.MockPayload("D")), 4);
1233 verify(followerActor.delegate).createSnapshot();
1235 assertEquals(6, followerActor.getReplicatedLog().size());
1237 //fake snapshot on index 6
1238 List<ReplicatedLogEntry> entries =
1240 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1241 new MockRaftActorContext.MockPayload("foo-6"))
1243 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1244 assertEquals(7, followerActor.getReplicatedLog().size());
1246 //fake snapshot on index 7
1247 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1251 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1252 new MockRaftActorContext.MockPayload("foo-7"))
1254 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1255 assertEquals(8, followerActor.getReplicatedLog().size());
1257 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1260 ByteString snapshotBytes = fromObject(Arrays.asList(
1261 new MockRaftActorContext.MockPayload("foo-0"),
1262 new MockRaftActorContext.MockPayload("foo-1"),
1263 new MockRaftActorContext.MockPayload("foo-2"),
1264 new MockRaftActorContext.MockPayload("foo-3"),
1265 new MockRaftActorContext.MockPayload("foo-4")));
1266 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1267 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1269 // The commit is needed to complete the snapshot creation process
1270 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1272 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1273 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1274 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1278 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1279 new MockRaftActorContext.MockPayload("foo-7"))
1281 // send an additional entry 8 with leaderCommit = 7
1282 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1284 // 7 and 8, as lastapplied is 7
1285 assertEquals(2, followerActor.getReplicatedLog().size());
1292 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1293 new JavaTestKit(getSystem()) {
1295 String persistenceId = factory.generateActorId("leader-");
1296 String follower1Id = factory.generateActorId("follower-");
1297 String follower2Id = factory.generateActorId("follower-");
1299 ActorRef followerActor1 =
1300 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1301 ActorRef followerActor2 =
1302 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1304 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1305 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1306 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1308 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1310 Map<String, String> peerAddresses = new HashMap<>();
1311 peerAddresses.put(follower1Id, followerActor1.path().toString());
1312 peerAddresses.put(follower2Id, followerActor2.path().toString());
1314 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1315 MockRaftActor.props(persistenceId, peerAddresses,
1316 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1318 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1319 leaderActor.getRaftActorContext().setCommitIndex(9);
1320 leaderActor.getRaftActorContext().setLastApplied(9);
1321 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1323 leaderActor.waitForInitializeBehaviorComplete();
1325 Leader leader = new Leader(leaderActor.getRaftActorContext());
1326 leaderActor.setCurrentBehavior(leader);
1327 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1329 // create 5 entries in the log
1330 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1331 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1333 //set the snapshot index to 4 , 0 to 4 are snapshotted
1334 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1335 //setting replicatedToAllIndex = 9, for the log to clear
1336 leader.setReplicatedToAllIndex(9);
1337 assertEquals(5, leaderActor.getReplicatedLog().size());
1338 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1340 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1341 assertEquals(5, leaderActor.getReplicatedLog().size());
1342 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1344 // set the 2nd follower nextIndex to 1 which has been snapshotted
1345 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1346 assertEquals(5, leaderActor.getReplicatedLog().size());
1347 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1349 // simulate a real snapshot
1350 leaderActor.onReceiveCommand(new SendHeartBeat());
1351 assertEquals(5, leaderActor.getReplicatedLog().size());
1352 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1353 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1354 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1357 //reply from a slow follower does not initiate a fake snapshot
1358 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1359 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1361 ByteString snapshotBytes = fromObject(Arrays.asList(
1362 new MockRaftActorContext.MockPayload("foo-0"),
1363 new MockRaftActorContext.MockPayload("foo-1"),
1364 new MockRaftActorContext.MockPayload("foo-2"),
1365 new MockRaftActorContext.MockPayload("foo-3"),
1366 new MockRaftActorContext.MockPayload("foo-4")));
1367 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1368 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1370 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1372 //reply from a slow follower after should not raise errors
1373 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1374 assertEquals(0, leaderActor.getReplicatedLog().size());
1380 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1381 new JavaTestKit(getSystem()) {{
1382 String persistenceId = factory.generateActorId("leader-");
1383 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1384 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1386 config.setSnapshotBatchCount(5);
1388 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1390 Map<String, String> peerAddresses = new HashMap<>();
1392 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1393 MockRaftActor.props(persistenceId, peerAddresses,
1394 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1396 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1397 leaderActor.getRaftActorContext().setCommitIndex(3);
1398 leaderActor.getRaftActorContext().setLastApplied(3);
1399 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1401 leaderActor.waitForInitializeBehaviorComplete();
1402 for(int i=0;i< 4;i++) {
1403 leaderActor.getReplicatedLog()
1404 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1405 new MockRaftActorContext.MockPayload("A")));
1408 Leader leader = new Leader(leaderActor.getRaftActorContext());
1409 leaderActor.setCurrentBehavior(leader);
1410 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1412 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1413 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1415 // Now send a CaptureSnapshotReply
1416 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1418 // Trimming log in this scenario is a no-op
1419 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1420 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1421 assertEquals(-1, leader.getReplicatedToAllIndex());
1427 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1428 new JavaTestKit(getSystem()) {{
1429 String persistenceId = factory.generateActorId("leader-");
1430 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1431 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1432 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1433 config.setSnapshotBatchCount(5);
1435 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1437 Map<String, String> peerAddresses = new HashMap<>();
1439 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1440 MockRaftActor.props(persistenceId, peerAddresses,
1441 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1443 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1444 leaderActor.getRaftActorContext().setCommitIndex(3);
1445 leaderActor.getRaftActorContext().setLastApplied(3);
1446 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1447 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1449 leaderActor.waitForInitializeBehaviorComplete();
1450 Leader leader = new Leader(leaderActor.getRaftActorContext());
1451 leaderActor.setCurrentBehavior(leader);
1452 leader.setReplicatedToAllIndex(3);
1453 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1455 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1456 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1458 // Now send a CaptureSnapshotReply
1459 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1461 // Trimming log in this scenario is a no-op
1462 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1463 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1464 assertEquals(3, leader.getReplicatedToAllIndex());
1469 private ByteString fromObject(Object snapshot) throws Exception {
1470 ByteArrayOutputStream b = null;
1471 ObjectOutputStream o = null;
1473 b = new ByteArrayOutputStream();
1474 o = new ObjectOutputStream(b);
1475 o.writeObject(snapshot);
1476 byte[] snapshotBytes = b.toByteArray();
1477 return ByteString.copyFrom(snapshotBytes);