1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RoleChanged;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
65 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
67 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
68 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
69 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
76 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
77 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
78 import scala.concurrent.Await;
79 import scala.concurrent.Future;
80 import scala.concurrent.duration.Duration;
81 import scala.concurrent.duration.FiniteDuration;
83 public class RaftActorTest extends AbstractActorTest {
85 private TestActorFactory factory;
89 factory = new TestActorFactory(getSystem());
93 public void tearDown() throws Exception {
95 InMemoryJournal.clear();
96 InMemorySnapshotStore.clear();
99 public static class MockRaftActor extends RaftActor {
101 protected DataPersistenceProvider dataPersistenceProvider;
102 private final RaftActor delegate;
103 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
104 private final List<Object> state;
105 private ActorRef roleChangeNotifier;
106 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
108 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
109 private static final long serialVersionUID = 1L;
110 private final Map<String, String> peerAddresses;
111 private final String id;
112 private final Optional<ConfigParams> config;
113 private final DataPersistenceProvider dataPersistenceProvider;
114 private final ActorRef roleChangeNotifier;
116 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
117 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
118 ActorRef roleChangeNotifier) {
119 this.peerAddresses = peerAddresses;
121 this.config = config;
122 this.dataPersistenceProvider = dataPersistenceProvider;
123 this.roleChangeNotifier = roleChangeNotifier;
127 public MockRaftActor create() throws Exception {
128 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
129 dataPersistenceProvider);
130 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
131 return mockRaftActor;
135 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
136 DataPersistenceProvider dataPersistenceProvider) {
137 super(id, peerAddresses, config);
138 state = new ArrayList<>();
139 this.delegate = mock(RaftActor.class);
140 if(dataPersistenceProvider == null){
141 this.dataPersistenceProvider = new PersistentDataProvider();
143 this.dataPersistenceProvider = dataPersistenceProvider;
147 public void waitForRecoveryComplete() {
149 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
150 } catch (InterruptedException e) {
155 public void waitForInitializeBehaviorComplete() {
157 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
158 } catch (InterruptedException e) {
164 public void waitUntilLeader(){
165 for(int i = 0;i < 10; i++){
169 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
173 public List<Object> getState() {
177 public static Props props(final String id, final Map<String, String> peerAddresses,
178 Optional<ConfigParams> config){
179 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
182 public static Props props(final String id, final Map<String, String> peerAddresses,
183 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
184 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
187 public static Props props(final String id, final Map<String, String> peerAddresses,
188 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
189 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
192 public static Props props(final String id, final Map<String, String> peerAddresses,
193 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
194 DataPersistenceProvider dataPersistenceProvider){
195 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
199 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
200 delegate.applyState(clientActor, identifier, data);
201 LOG.info("{}: applyState called", persistenceId());
205 protected void startLogRecoveryBatch(int maxBatchSize) {
209 protected void appendRecoveredLogEntry(Payload data) {
214 protected void applyCurrentLogRecoveryBatch() {
218 protected void onRecoveryComplete() {
219 delegate.onRecoveryComplete();
220 recoveryComplete.countDown();
224 protected void initializeBehavior() {
225 super.initializeBehavior();
226 initializeBehaviorComplete.countDown();
230 protected void applyRecoverySnapshot(byte[] bytes) {
231 delegate.applyRecoverySnapshot(bytes);
233 Object data = toObject(bytes);
234 if (data instanceof List) {
235 state.addAll((List<?>) data);
237 } catch (Exception e) {
242 @Override protected void createSnapshot() {
243 LOG.info("{}: createSnapshot called", persistenceId());
244 delegate.createSnapshot();
247 @Override protected void applySnapshot(byte [] snapshot) {
248 LOG.info("{}: applySnapshot called", persistenceId());
249 delegate.applySnapshot(snapshot);
252 @Override protected void onStateChanged() {
253 delegate.onStateChanged();
257 protected DataPersistenceProvider persistence() {
258 return this.dataPersistenceProvider;
262 protected Optional<ActorRef> getRoleChangeNotifier() {
263 return Optional.fromNullable(roleChangeNotifier);
266 @Override public String persistenceId() {
270 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
272 ByteArrayInputStream bis = null;
273 ObjectInputStream ois = null;
275 bis = new ByteArrayInputStream(bs);
276 ois = new ObjectInputStream(bis);
277 obj = ois.readObject();
289 public ReplicatedLog getReplicatedLog(){
290 return this.getRaftActorContext().getReplicatedLog();
296 public static class RaftActorTestKit extends JavaTestKit {
297 private final ActorRef raftActor;
299 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
302 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
303 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
308 public ActorRef getRaftActor() {
312 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
313 // Wait for a specific log message to show up
315 new JavaTestKit.EventFilter<Boolean>(logEventClass
318 protected Boolean run() {
321 }.from(raftActor.path().toString())
323 .occurrences(1).exec();
328 protected void waitUntilLeader(){
329 waitUntilLeader(raftActor);
332 public static void waitUntilLeader(ActorRef actorRef) {
333 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
334 for(int i = 0; i < 20 * 5; i++) {
335 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
337 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
338 if(resp.getLeaderActor() != null) {
341 } catch(TimeoutException e) {
342 } catch(Exception e) {
343 System.err.println("FindLeader threw ex");
348 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
351 Assert.fail("Leader not found for actorRef " + actorRef.path());
358 public void testConstruction() {
359 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
363 public void testFindLeaderWhenLeaderIsSelf(){
364 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
365 kit.waitUntilLeader();
369 public void testRaftActorRecovery() throws Exception {
370 new JavaTestKit(getSystem()) {{
371 String persistenceId = factory.generateActorId("follower-");
373 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
374 // Set the heartbeat interval high to essentially disable election otherwise the test
375 // may fail if the actor is switched to Leader and the commitIndex is set to the last
377 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
379 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
380 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
382 watch(followerActor);
384 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
385 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
386 new MockRaftActorContext.MockPayload("E"));
387 snapshotUnappliedEntries.add(entry1);
389 int lastAppliedDuringSnapshotCapture = 3;
390 int lastIndexDuringSnapshotCapture = 4;
392 // 4 messages as part of snapshot, which are applied to state
393 ByteString snapshotBytes = fromObject(Arrays.asList(
394 new MockRaftActorContext.MockPayload("A"),
395 new MockRaftActorContext.MockPayload("B"),
396 new MockRaftActorContext.MockPayload("C"),
397 new MockRaftActorContext.MockPayload("D")));
399 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
400 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
401 lastAppliedDuringSnapshotCapture, 1);
402 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
404 // add more entries after snapshot is taken
405 List<ReplicatedLogEntry> entries = new ArrayList<>();
406 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
407 new MockRaftActorContext.MockPayload("F"));
408 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
409 new MockRaftActorContext.MockPayload("G"));
410 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
411 new MockRaftActorContext.MockPayload("H"));
416 int lastAppliedToState = 5;
419 InMemoryJournal.addEntry(persistenceId, 5, entry2);
420 // 2 entries are applied to state besides the 4 entries in snapshot
421 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
422 InMemoryJournal.addEntry(persistenceId, 7, entry3);
423 InMemoryJournal.addEntry(persistenceId, 8, entry4);
426 followerActor.tell(PoisonPill.getInstance(), null);
427 expectMsgClass(duration("5 seconds"), Terminated.class);
429 unwatch(followerActor);
431 //reinstate the actor
432 TestActorRef<MockRaftActor> ref = factory.createTestActor(
433 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
434 Optional.<ConfigParams>of(config)));
436 ref.underlyingActor().waitForRecoveryComplete();
438 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
439 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
440 context.getReplicatedLog().size());
441 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
442 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
443 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
444 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
449 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
450 new JavaTestKit(getSystem()) {{
451 String persistenceId = factory.generateActorId("leader-");
453 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
454 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
456 // Setup the persisted journal with some entries
457 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
458 new MockRaftActorContext.MockPayload("zero"));
459 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
460 new MockRaftActorContext.MockPayload("oen"));
461 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
462 new MockRaftActorContext.MockPayload("two"));
465 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
466 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
467 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
468 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
470 int lastAppliedToState = 1;
473 //reinstate the actor
474 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
475 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
476 Optional.<ConfigParams>of(config)));
478 leaderActor.underlyingActor().waitForRecoveryComplete();
480 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
481 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
482 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
483 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
484 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
489 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
490 * process recovery messages
496 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
497 new JavaTestKit(getSystem()) {
499 String persistenceId = factory.generateActorId("leader-");
501 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
503 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
505 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
506 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
508 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
510 // Wait for akka's recovery to complete so it doesn't interfere.
511 mockRaftActor.waitForRecoveryComplete();
513 ByteString snapshotBytes = fromObject(Arrays.asList(
514 new MockRaftActorContext.MockPayload("A"),
515 new MockRaftActorContext.MockPayload("B"),
516 new MockRaftActorContext.MockPayload("C"),
517 new MockRaftActorContext.MockPayload("D")));
519 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
520 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
522 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
524 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
526 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
528 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
530 assertEquals("add replicated log entry", 1, replicatedLog.size());
532 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
534 assertEquals("add replicated log entry", 2, replicatedLog.size());
536 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
538 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
540 // The snapshot had 4 items + we added 2 more items during the test
541 // We start removing from 5 and we should get 1 item in the replicated log
542 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
544 assertEquals("remove log entries", 1, replicatedLog.size());
546 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
548 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
549 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
551 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
557 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
558 * not process recovery messages
563 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
564 new JavaTestKit(getSystem()) {
566 String persistenceId = factory.generateActorId("leader-");
568 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
570 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
572 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
573 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
575 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
577 // Wait for akka's recovery to complete so it doesn't interfere.
578 mockRaftActor.waitForRecoveryComplete();
580 ByteString snapshotBytes = fromObject(Arrays.asList(
581 new MockRaftActorContext.MockPayload("A"),
582 new MockRaftActorContext.MockPayload("B"),
583 new MockRaftActorContext.MockPayload("C"),
584 new MockRaftActorContext.MockPayload("D")));
586 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
587 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
589 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
591 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
593 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
595 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
597 assertEquals("add replicated log entry", 0, replicatedLog.size());
599 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
601 assertEquals("add replicated log entry", 0, replicatedLog.size());
603 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
605 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
607 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
609 assertEquals("remove log entries", 0, replicatedLog.size());
611 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
613 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
614 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
616 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
622 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
623 new JavaTestKit(getSystem()) {
625 String persistenceId = factory.generateActorId("leader-");
627 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
629 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
631 CountDownLatch persistLatch = new CountDownLatch(1);
632 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
633 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
635 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
636 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
638 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
640 mockRaftActor.waitForInitializeBehaviorComplete();
642 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
644 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
650 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
651 new JavaTestKit(getSystem()) {
653 String persistenceId = factory.generateActorId("leader-");
655 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
657 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
659 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
661 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
662 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
664 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
666 mockRaftActor.waitForInitializeBehaviorComplete();
668 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
670 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
672 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
678 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
679 new JavaTestKit(getSystem()) {
681 String persistenceId = factory.generateActorId("leader-");
683 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
685 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
687 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
689 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
690 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
692 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
694 mockRaftActor.waitForInitializeBehaviorComplete();
696 mockRaftActor.waitUntilLeader();
698 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
700 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
702 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
708 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
709 new JavaTestKit(getSystem()) {
711 String persistenceId = factory.generateActorId("leader-");
713 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
715 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
717 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
719 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
720 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
722 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
724 mockRaftActor.waitForInitializeBehaviorComplete();
726 mockRaftActor.waitUntilLeader();
728 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
730 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
738 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
739 new JavaTestKit(getSystem()) {
741 String persistenceId = factory.generateActorId("leader-");
743 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
745 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
747 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
749 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
750 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
751 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
753 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
755 mockRaftActor.waitForInitializeBehaviorComplete();
757 ByteString snapshotBytes = fromObject(Arrays.asList(
758 new MockRaftActorContext.MockPayload("A"),
759 new MockRaftActorContext.MockPayload("B"),
760 new MockRaftActorContext.MockPayload("C"),
761 new MockRaftActorContext.MockPayload("D")));
763 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
765 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
767 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
769 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
771 verify(dataPersistenceProvider).saveSnapshot(anyObject());
778 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
779 new JavaTestKit(getSystem()) {
781 String persistenceId = factory.generateActorId("leader-");
783 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
785 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
789 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
790 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
792 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
794 mockRaftActor.waitForInitializeBehaviorComplete();
796 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
797 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
798 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
799 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
800 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
802 ByteString snapshotBytes = fromObject(Arrays.asList(
803 new MockRaftActorContext.MockPayload("A"),
804 new MockRaftActorContext.MockPayload("B"),
805 new MockRaftActorContext.MockPayload("C"),
806 new MockRaftActorContext.MockPayload("D")));
808 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
809 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
811 long replicatedToAllIndex = 1;
812 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
814 verify(mockRaftActor.delegate).createSnapshot();
816 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
818 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
820 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
822 verify(dataPersistenceProvider).deleteMessages(100);
824 assertEquals(3, mockRaftActor.getReplicatedLog().size());
825 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
827 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
828 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
829 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
831 // Index 2 will not be in the log because it was removed due to snapshotting
832 assertNull(mockRaftActor.getReplicatedLog().get(1));
833 assertNull(mockRaftActor.getReplicatedLog().get(0));
840 public void testApplyState() throws Exception {
842 new JavaTestKit(getSystem()) {
844 String persistenceId = factory.generateActorId("leader-");
846 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
848 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
850 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
852 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
853 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
855 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
857 mockRaftActor.waitForInitializeBehaviorComplete();
859 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
860 new MockRaftActorContext.MockPayload("F"));
862 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
864 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
871 public void testApplySnapshot() throws Exception {
872 new JavaTestKit(getSystem()) {
874 String persistenceId = factory.generateActorId("leader-");
876 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
878 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
880 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
882 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
883 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
885 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
887 mockRaftActor.waitForInitializeBehaviorComplete();
889 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
891 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
892 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
893 oldReplicatedLog.append(
894 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
895 mock(Payload.class)));
897 ByteString snapshotBytes = fromObject(Arrays.asList(
898 new MockRaftActorContext.MockPayload("A"),
899 new MockRaftActorContext.MockPayload("B"),
900 new MockRaftActorContext.MockPayload("C"),
901 new MockRaftActorContext.MockPayload("D")));
903 Snapshot snapshot = mock(Snapshot.class);
905 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
907 doReturn(3L).when(snapshot).getLastAppliedIndex();
909 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
911 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
913 assertTrue("The replicatedLog should have changed",
914 oldReplicatedLog != mockRaftActor.getReplicatedLog());
916 assertEquals("lastApplied should be same as in the snapshot",
917 (Long) 3L, mockRaftActor.getLastApplied());
919 assertEquals(0, mockRaftActor.getReplicatedLog().size());
926 public void testSaveSnapshotFailure() throws Exception {
927 new JavaTestKit(getSystem()) {
929 String persistenceId = factory.generateActorId("leader-");
931 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
933 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
935 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
937 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
938 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
940 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
942 mockRaftActor.waitForInitializeBehaviorComplete();
944 ByteString snapshotBytes = fromObject(Arrays.asList(
945 new MockRaftActorContext.MockPayload("A"),
946 new MockRaftActorContext.MockPayload("B"),
947 new MockRaftActorContext.MockPayload("C"),
948 new MockRaftActorContext.MockPayload("D")));
950 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
952 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
954 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
956 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
958 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
961 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
962 mockRaftActor.getReplicatedLog().getSnapshotIndex());
969 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
970 new JavaTestKit(getSystem()) {{
971 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
972 Props.create(MessageCollectorActor.class));
973 MessageCollectorActor.waitUntilReady(notifierActor);
975 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
976 long heartBeatInterval = 100;
977 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
978 config.setElectionTimeoutFactor(20);
980 String persistenceId = factory.generateActorId("notifier-");
982 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
983 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
984 new NonPersistentProvider()), persistenceId);
986 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
989 // check if the notifier got a role change from null to Follower
990 RoleChanged raftRoleChanged = matches.get(0);
991 assertEquals(persistenceId, raftRoleChanged.getMemberId());
992 assertNull(raftRoleChanged.getOldRole());
993 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
995 // check if the notifier got a role change from Follower to Candidate
996 raftRoleChanged = matches.get(1);
997 assertEquals(persistenceId, raftRoleChanged.getMemberId());
998 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
999 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1001 // check if the notifier got a role change from Candidate to Leader
1002 raftRoleChanged = matches.get(2);
1003 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1004 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1005 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1007 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1008 notifierActor, LeaderStateChanged.class);
1010 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1012 notifierActor.underlyingActor().clear();
1014 MockRaftActor raftActor = raftActorRef.underlyingActor();
1015 final String newLeaderId = "new-leader";
1016 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1018 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1019 leaderId = newLeaderId;
1024 raftActor.changeCurrentBehavior(follower);
1026 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1027 assertEquals(persistenceId, leaderStateChange.getMemberId());
1028 assertEquals(null, leaderStateChange.getLeaderId());
1030 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1031 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1032 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1034 notifierActor.underlyingActor().clear();
1036 raftActor.handleCommand("any");
1038 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1039 assertEquals(persistenceId, leaderStateChange.getMemberId());
1040 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1045 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1046 new JavaTestKit(getSystem()) {{
1047 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1048 MessageCollectorActor.waitUntilReady(notifierActor);
1050 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1051 long heartBeatInterval = 100;
1052 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1053 config.setElectionTimeoutFactor(1);
1055 String persistenceId = factory.generateActorId("notifier-");
1057 factory.createActor(MockRaftActor.props(persistenceId,
1058 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1060 List<RoleChanged> matches = null;
1061 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1062 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1063 assertNotNull(matches);
1064 if(matches.size() == 3) {
1067 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1070 assertEquals(2, matches.size());
1072 // check if the notifier got a role change from null to Follower
1073 RoleChanged raftRoleChanged = matches.get(0);
1074 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1075 assertNull(raftRoleChanged.getOldRole());
1076 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1078 // check if the notifier got a role change from Follower to Candidate
1079 raftRoleChanged = matches.get(1);
1080 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1081 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1082 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1088 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1089 new JavaTestKit(getSystem()) {
1091 String persistenceId = factory.generateActorId("leader-");
1092 String follower1Id = factory.generateActorId("follower-");
1094 ActorRef followerActor1 =
1095 factory.createActor(Props.create(MessageCollectorActor.class));
1097 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1099 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1101 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1103 Map<String, String> peerAddresses = new HashMap<>();
1104 peerAddresses.put(follower1Id, followerActor1.path().toString());
1106 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1107 MockRaftActor.props(persistenceId, peerAddresses,
1108 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1110 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1112 leaderActor.getRaftActorContext().setCommitIndex(4);
1113 leaderActor.getRaftActorContext().setLastApplied(4);
1114 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1116 leaderActor.waitForInitializeBehaviorComplete();
1118 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1120 Leader leader = new Leader(leaderActor.getRaftActorContext());
1121 leaderActor.setCurrentBehavior(leader);
1122 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1124 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1125 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1127 assertEquals(8, leaderActor.getReplicatedLog().size());
1129 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1131 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1132 verify(leaderActor.delegate).createSnapshot();
1134 assertEquals(8, leaderActor.getReplicatedLog().size());
1136 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1137 //fake snapshot on index 5
1138 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1140 assertEquals(8, leaderActor.getReplicatedLog().size());
1142 //fake snapshot on index 6
1143 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1144 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1145 assertEquals(8, leaderActor.getReplicatedLog().size());
1147 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1149 assertEquals(8, leaderActor.getReplicatedLog().size());
1151 ByteString snapshotBytes = fromObject(Arrays.asList(
1152 new MockRaftActorContext.MockPayload("foo-0"),
1153 new MockRaftActorContext.MockPayload("foo-1"),
1154 new MockRaftActorContext.MockPayload("foo-2"),
1155 new MockRaftActorContext.MockPayload("foo-3"),
1156 new MockRaftActorContext.MockPayload("foo-4")));
1157 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1158 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1160 // capture snapshot reply should remove the snapshotted entries only
1161 assertEquals(3, leaderActor.getReplicatedLog().size());
1162 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1164 // add another non-replicated entry
1165 leaderActor.getReplicatedLog().append(
1166 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1168 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1169 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1170 assertEquals(2, leaderActor.getReplicatedLog().size());
1171 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1178 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1179 new JavaTestKit(getSystem()) {
1181 String persistenceId = factory.generateActorId("follower-");
1182 String leaderId = factory.generateActorId("leader-");
1185 ActorRef leaderActor1 =
1186 factory.createActor(Props.create(MessageCollectorActor.class));
1188 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1189 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1190 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1192 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1194 Map<String, String> peerAddresses = new HashMap<>();
1195 peerAddresses.put(leaderId, leaderActor1.path().toString());
1197 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1198 MockRaftActor.props(persistenceId, peerAddresses,
1199 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1201 MockRaftActor followerActor = mockActorRef.underlyingActor();
1202 followerActor.getRaftActorContext().setCommitIndex(4);
1203 followerActor.getRaftActorContext().setLastApplied(4);
1204 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1206 followerActor.waitForInitializeBehaviorComplete();
1209 Follower follower = new Follower(followerActor.getRaftActorContext());
1210 followerActor.setCurrentBehavior(follower);
1211 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1213 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1214 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1215 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1217 // log has indices 0-5
1218 assertEquals(6, followerActor.getReplicatedLog().size());
1221 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1223 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1224 verify(followerActor.delegate).createSnapshot();
1226 assertEquals(6, followerActor.getReplicatedLog().size());
1228 //fake snapshot on index 6
1229 List<ReplicatedLogEntry> entries =
1231 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1232 new MockRaftActorContext.MockPayload("foo-6"))
1234 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1235 assertEquals(7, followerActor.getReplicatedLog().size());
1237 //fake snapshot on index 7
1238 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1242 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1243 new MockRaftActorContext.MockPayload("foo-7"))
1245 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1246 assertEquals(8, followerActor.getReplicatedLog().size());
1248 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1251 ByteString snapshotBytes = fromObject(Arrays.asList(
1252 new MockRaftActorContext.MockPayload("foo-0"),
1253 new MockRaftActorContext.MockPayload("foo-1"),
1254 new MockRaftActorContext.MockPayload("foo-2"),
1255 new MockRaftActorContext.MockPayload("foo-3"),
1256 new MockRaftActorContext.MockPayload("foo-4")));
1257 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1258 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1260 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1261 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1262 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1266 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1267 new MockRaftActorContext.MockPayload("foo-7"))
1269 // send an additional entry 8 with leaderCommit = 7
1270 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1272 // 7 and 8, as lastapplied is 7
1273 assertEquals(2, followerActor.getReplicatedLog().size());
1280 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1281 new JavaTestKit(getSystem()) {
1283 String persistenceId = factory.generateActorId("leader-");
1284 String follower1Id = factory.generateActorId("follower-");
1285 String follower2Id = factory.generateActorId("follower-");
1287 ActorRef followerActor1 =
1288 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1289 ActorRef followerActor2 =
1290 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1292 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1293 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1294 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1296 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1298 Map<String, String> peerAddresses = new HashMap<>();
1299 peerAddresses.put(follower1Id, followerActor1.path().toString());
1300 peerAddresses.put(follower2Id, followerActor2.path().toString());
1302 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1303 MockRaftActor.props(persistenceId, peerAddresses,
1304 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1306 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1307 leaderActor.getRaftActorContext().setCommitIndex(9);
1308 leaderActor.getRaftActorContext().setLastApplied(9);
1309 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1311 leaderActor.waitForInitializeBehaviorComplete();
1313 Leader leader = new Leader(leaderActor.getRaftActorContext());
1314 leaderActor.setCurrentBehavior(leader);
1315 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1317 // create 5 entries in the log
1318 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1319 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1321 //set the snapshot index to 4 , 0 to 4 are snapshotted
1322 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1323 //setting replicatedToAllIndex = 9, for the log to clear
1324 leader.setReplicatedToAllIndex(9);
1325 assertEquals(5, leaderActor.getReplicatedLog().size());
1326 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1328 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1329 assertEquals(5, leaderActor.getReplicatedLog().size());
1330 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1332 // set the 2nd follower nextIndex to 1 which has been snapshotted
1333 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1334 assertEquals(5, leaderActor.getReplicatedLog().size());
1335 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1337 // simulate a real snapshot
1338 leaderActor.onReceiveCommand(new SendHeartBeat());
1339 assertEquals(5, leaderActor.getReplicatedLog().size());
1340 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1341 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1342 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1345 //reply from a slow follower does not initiate a fake snapshot
1346 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1347 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1349 ByteString snapshotBytes = fromObject(Arrays.asList(
1350 new MockRaftActorContext.MockPayload("foo-0"),
1351 new MockRaftActorContext.MockPayload("foo-1"),
1352 new MockRaftActorContext.MockPayload("foo-2"),
1353 new MockRaftActorContext.MockPayload("foo-3"),
1354 new MockRaftActorContext.MockPayload("foo-4")));
1355 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1356 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1358 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1360 //reply from a slow follower after should not raise errors
1361 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1362 assertEquals(0, leaderActor.getReplicatedLog().size());
1368 private static class NonPersistentProvider implements DataPersistenceProvider {
1370 public boolean isRecoveryApplicable() {
1375 public <T> void persist(T o, Procedure<T> procedure) {
1378 } catch (Exception e) {
1379 e.printStackTrace();
1384 public void saveSnapshot(Object o) {
1389 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1394 public void deleteMessages(long sequenceNumber) {
1400 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1401 new JavaTestKit(getSystem()) {{
1402 String persistenceId = factory.generateActorId("leader-");
1403 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1404 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1405 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1406 config.setSnapshotBatchCount(5);
1408 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1410 Map<String, String> peerAddresses = new HashMap<>();
1412 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1413 MockRaftActor.props(persistenceId, peerAddresses,
1414 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1416 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1417 leaderActor.getRaftActorContext().setCommitIndex(3);
1418 leaderActor.getRaftActorContext().setLastApplied(3);
1419 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1421 leaderActor.waitForInitializeBehaviorComplete();
1422 for(int i=0;i< 4;i++) {
1423 leaderActor.getReplicatedLog()
1424 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1425 new MockRaftActorContext.MockPayload("A")));
1428 Leader leader = new Leader(leaderActor.getRaftActorContext());
1429 leaderActor.setCurrentBehavior(leader);
1430 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1432 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1433 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1435 // Now send a CaptureSnapshotReply
1436 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1438 // Trimming log in this scenario is a no-op
1439 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1440 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1441 assertEquals(-1, leader.getReplicatedToAllIndex());
1447 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1448 new JavaTestKit(getSystem()) {{
1449 String persistenceId = factory.generateActorId("leader-");
1450 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1451 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1452 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1453 config.setSnapshotBatchCount(5);
1455 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1457 Map<String, String> peerAddresses = new HashMap<>();
1459 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1460 MockRaftActor.props(persistenceId, peerAddresses,
1461 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1463 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1464 leaderActor.getRaftActorContext().setCommitIndex(3);
1465 leaderActor.getRaftActorContext().setLastApplied(3);
1466 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1467 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1469 leaderActor.waitForInitializeBehaviorComplete();
1470 Leader leader = new Leader(leaderActor.getRaftActorContext());
1471 leaderActor.setCurrentBehavior(leader);
1472 leader.setReplicatedToAllIndex(3);
1473 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1475 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1476 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1478 // Now send a CaptureSnapshotReply
1479 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1481 // Trimming log in this scenario is a no-op
1482 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1483 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1484 assertEquals(3, leader.getReplicatedToAllIndex());
1489 private ByteString fromObject(Object snapshot) throws Exception {
1490 ByteArrayOutputStream b = null;
1491 ObjectOutputStream o = null;
1493 b = new ByteArrayOutputStream();
1494 o = new ObjectOutputStream(b);
1495 o.writeObject(snapshot);
1496 byte[] snapshotBytes = b.toByteArray();
1497 return ByteString.copyFrom(snapshotBytes);