1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.opendaylight.controller.cluster.DataPersistenceProvider;
56 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
57 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
58 import org.opendaylight.controller.cluster.notifications.RoleChanged;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
63 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
66 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
67 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
68 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
69 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
71 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.Duration;
80 import scala.concurrent.duration.FiniteDuration;
82 public class RaftActorTest extends AbstractActorTest {
84 private TestActorFactory factory;
88 factory = new TestActorFactory(getSystem());
92 public void tearDown() throws Exception {
94 InMemoryJournal.clear();
95 InMemorySnapshotStore.clear();
98 public static class MockRaftActor extends RaftActor {
100 protected DataPersistenceProvider dataPersistenceProvider;
101 private final RaftActor delegate;
102 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103 private final List<Object> state;
104 private ActorRef roleChangeNotifier;
105 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
107 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108 private static final long serialVersionUID = 1L;
109 private final Map<String, String> peerAddresses;
110 private final String id;
111 private final Optional<ConfigParams> config;
112 private final DataPersistenceProvider dataPersistenceProvider;
113 private final ActorRef roleChangeNotifier;
115 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117 ActorRef roleChangeNotifier) {
118 this.peerAddresses = peerAddresses;
120 this.config = config;
121 this.dataPersistenceProvider = dataPersistenceProvider;
122 this.roleChangeNotifier = roleChangeNotifier;
126 public MockRaftActor create() throws Exception {
127 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128 dataPersistenceProvider);
129 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130 return mockRaftActor;
134 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135 DataPersistenceProvider dataPersistenceProvider) {
136 super(id, peerAddresses, config);
137 state = new ArrayList<>();
138 this.delegate = mock(RaftActor.class);
139 if(dataPersistenceProvider == null){
140 this.dataPersistenceProvider = new PersistentDataProvider();
142 this.dataPersistenceProvider = dataPersistenceProvider;
146 public void waitForRecoveryComplete() {
148 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
149 } catch (InterruptedException e) {
154 public void waitForInitializeBehaviorComplete() {
156 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
157 } catch (InterruptedException e) {
162 public List<Object> getState() {
166 public static Props props(final String id, final Map<String, String> peerAddresses,
167 Optional<ConfigParams> config){
168 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
171 public static Props props(final String id, final Map<String, String> peerAddresses,
172 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
173 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
176 public static Props props(final String id, final Map<String, String> peerAddresses,
177 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
178 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
181 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
182 delegate.applyState(clientActor, identifier, data);
183 LOG.info("{}: applyState called", persistenceId());
187 protected void startLogRecoveryBatch(int maxBatchSize) {
191 protected void appendRecoveredLogEntry(Payload data) {
196 protected void applyCurrentLogRecoveryBatch() {
200 protected void onRecoveryComplete() {
201 delegate.onRecoveryComplete();
202 recoveryComplete.countDown();
206 protected void initializeBehavior() {
207 super.initializeBehavior();
208 initializeBehaviorComplete.countDown();
212 protected void applyRecoverySnapshot(byte[] bytes) {
213 delegate.applyRecoverySnapshot(bytes);
215 Object data = toObject(bytes);
216 if (data instanceof List) {
217 state.addAll((List<?>) data);
219 } catch (Exception e) {
224 @Override protected void createSnapshot() {
225 LOG.info("{}: createSnapshot called", persistenceId());
226 delegate.createSnapshot();
229 @Override protected void applySnapshot(byte [] snapshot) {
230 LOG.info("{}: applySnapshot called", persistenceId());
231 delegate.applySnapshot(snapshot);
234 @Override protected void onStateChanged() {
235 delegate.onStateChanged();
239 protected DataPersistenceProvider persistence() {
240 return this.dataPersistenceProvider;
244 protected Optional<ActorRef> getRoleChangeNotifier() {
245 return Optional.fromNullable(roleChangeNotifier);
248 @Override public String persistenceId() {
252 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
254 ByteArrayInputStream bis = null;
255 ObjectInputStream ois = null;
257 bis = new ByteArrayInputStream(bs);
258 ois = new ObjectInputStream(bis);
259 obj = ois.readObject();
271 public ReplicatedLog getReplicatedLog(){
272 return this.getRaftActorContext().getReplicatedLog();
278 public static class RaftActorTestKit extends JavaTestKit {
279 private final ActorRef raftActor;
281 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
284 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
285 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
290 public ActorRef getRaftActor() {
294 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
295 // Wait for a specific log message to show up
297 new JavaTestKit.EventFilter<Boolean>(logEventClass
300 protected Boolean run() {
303 }.from(raftActor.path().toString())
305 .occurrences(1).exec();
310 protected void waitUntilLeader(){
311 waitUntilLeader(raftActor);
314 public static void waitUntilLeader(ActorRef actorRef) {
315 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
316 for(int i = 0; i < 20 * 5; i++) {
317 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
319 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
320 if(resp.getLeaderActor() != null) {
323 } catch(TimeoutException e) {
324 } catch(Exception e) {
325 System.err.println("FindLeader threw ex");
330 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
333 Assert.fail("Leader not found for actorRef " + actorRef.path());
340 public void testConstruction() {
341 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
345 public void testFindLeaderWhenLeaderIsSelf(){
346 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
347 kit.waitUntilLeader();
351 public void testRaftActorRecovery() throws Exception {
352 new JavaTestKit(getSystem()) {{
353 String persistenceId = factory.generateActorId("follower-");
355 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
356 // Set the heartbeat interval high to essentially disable election otherwise the test
357 // may fail if the actor is switched to Leader and the commitIndex is set to the last
359 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
361 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
362 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
364 watch(followerActor);
366 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
367 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
368 new MockRaftActorContext.MockPayload("E"));
369 snapshotUnappliedEntries.add(entry1);
371 int lastAppliedDuringSnapshotCapture = 3;
372 int lastIndexDuringSnapshotCapture = 4;
374 // 4 messages as part of snapshot, which are applied to state
375 ByteString snapshotBytes = fromObject(Arrays.asList(
376 new MockRaftActorContext.MockPayload("A"),
377 new MockRaftActorContext.MockPayload("B"),
378 new MockRaftActorContext.MockPayload("C"),
379 new MockRaftActorContext.MockPayload("D")));
381 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
382 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
383 lastAppliedDuringSnapshotCapture, 1);
384 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
386 // add more entries after snapshot is taken
387 List<ReplicatedLogEntry> entries = new ArrayList<>();
388 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
389 new MockRaftActorContext.MockPayload("F"));
390 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
391 new MockRaftActorContext.MockPayload("G"));
392 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
393 new MockRaftActorContext.MockPayload("H"));
398 int lastAppliedToState = 5;
401 InMemoryJournal.addEntry(persistenceId, 5, entry2);
402 // 2 entries are applied to state besides the 4 entries in snapshot
403 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
404 InMemoryJournal.addEntry(persistenceId, 7, entry3);
405 InMemoryJournal.addEntry(persistenceId, 8, entry4);
408 followerActor.tell(PoisonPill.getInstance(), null);
409 expectMsgClass(duration("5 seconds"), Terminated.class);
411 unwatch(followerActor);
413 //reinstate the actor
414 TestActorRef<MockRaftActor> ref = factory.createTestActor(
415 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
416 Optional.<ConfigParams>of(config)));
418 ref.underlyingActor().waitForRecoveryComplete();
420 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
421 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
422 context.getReplicatedLog().size());
423 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
424 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
425 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
426 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
431 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
432 new JavaTestKit(getSystem()) {{
433 String persistenceId = factory.generateActorId("leader-");
435 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
436 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
438 // Setup the persisted journal with some entries
439 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
440 new MockRaftActorContext.MockPayload("zero"));
441 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
442 new MockRaftActorContext.MockPayload("oen"));
443 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
444 new MockRaftActorContext.MockPayload("two"));
447 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
448 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
449 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
450 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
452 int lastAppliedToState = 1;
455 //reinstate the actor
456 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
457 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
458 Optional.<ConfigParams>of(config)));
460 leaderActor.underlyingActor().waitForRecoveryComplete();
462 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
463 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
464 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
465 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
466 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
471 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
472 * process recovery messages
478 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
479 new JavaTestKit(getSystem()) {
481 String persistenceId = factory.generateActorId("leader-");
483 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
485 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
487 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
488 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
490 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
492 // Wait for akka's recovery to complete so it doesn't interfere.
493 mockRaftActor.waitForRecoveryComplete();
495 ByteString snapshotBytes = fromObject(Arrays.asList(
496 new MockRaftActorContext.MockPayload("A"),
497 new MockRaftActorContext.MockPayload("B"),
498 new MockRaftActorContext.MockPayload("C"),
499 new MockRaftActorContext.MockPayload("D")));
501 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
502 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
504 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
506 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
508 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
510 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
512 assertEquals("add replicated log entry", 1, replicatedLog.size());
514 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
516 assertEquals("add replicated log entry", 2, replicatedLog.size());
518 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
520 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
522 // The snapshot had 4 items + we added 2 more items during the test
523 // We start removing from 5 and we should get 1 item in the replicated log
524 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
526 assertEquals("remove log entries", 1, replicatedLog.size());
528 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
530 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
531 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
533 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
539 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
540 * not process recovery messages
545 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
546 new JavaTestKit(getSystem()) {
548 String persistenceId = factory.generateActorId("leader-");
550 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
552 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
554 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
555 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
557 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
559 // Wait for akka's recovery to complete so it doesn't interfere.
560 mockRaftActor.waitForRecoveryComplete();
562 ByteString snapshotBytes = fromObject(Arrays.asList(
563 new MockRaftActorContext.MockPayload("A"),
564 new MockRaftActorContext.MockPayload("B"),
565 new MockRaftActorContext.MockPayload("C"),
566 new MockRaftActorContext.MockPayload("D")));
568 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
569 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
571 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
573 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
575 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
577 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
579 assertEquals("add replicated log entry", 0, replicatedLog.size());
581 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
583 assertEquals("add replicated log entry", 0, replicatedLog.size());
585 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
587 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
589 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
591 assertEquals("remove log entries", 0, replicatedLog.size());
593 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
595 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
596 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
598 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
604 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
605 new JavaTestKit(getSystem()) {
607 String persistenceId = factory.generateActorId("leader-");
609 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
611 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
613 CountDownLatch persistLatch = new CountDownLatch(1);
614 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
615 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
617 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
618 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
620 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
622 mockRaftActor.waitForInitializeBehaviorComplete();
624 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
626 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
632 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
633 new JavaTestKit(getSystem()) {
635 String persistenceId = factory.generateActorId("leader-");
637 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
639 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
641 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
643 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
644 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
646 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
648 mockRaftActor.waitForInitializeBehaviorComplete();
650 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
652 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
654 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
660 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
661 new JavaTestKit(getSystem()) {
663 String persistenceId = factory.generateActorId("leader-");
665 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
667 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
669 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
671 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
672 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
674 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
676 mockRaftActor.waitForInitializeBehaviorComplete();
678 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
680 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
682 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
688 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
689 new JavaTestKit(getSystem()) {
691 String persistenceId = factory.generateActorId("leader-");
693 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
695 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
697 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
699 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
700 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
702 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
704 mockRaftActor.waitForInitializeBehaviorComplete();
706 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
708 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
716 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
717 new JavaTestKit(getSystem()) {
719 String persistenceId = factory.generateActorId("leader-");
721 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
723 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
725 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
727 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
728 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
729 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
731 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
733 mockRaftActor.waitForInitializeBehaviorComplete();
735 ByteString snapshotBytes = fromObject(Arrays.asList(
736 new MockRaftActorContext.MockPayload("A"),
737 new MockRaftActorContext.MockPayload("B"),
738 new MockRaftActorContext.MockPayload("C"),
739 new MockRaftActorContext.MockPayload("D")));
741 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
743 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
745 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
747 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
749 verify(dataPersistenceProvider).saveSnapshot(anyObject());
756 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
757 new JavaTestKit(getSystem()) {
759 String persistenceId = factory.generateActorId("leader-");
761 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
763 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
765 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
767 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
768 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
770 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
772 mockRaftActor.waitForInitializeBehaviorComplete();
774 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
775 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
776 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
777 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
778 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
780 ByteString snapshotBytes = fromObject(Arrays.asList(
781 new MockRaftActorContext.MockPayload("A"),
782 new MockRaftActorContext.MockPayload("B"),
783 new MockRaftActorContext.MockPayload("C"),
784 new MockRaftActorContext.MockPayload("D")));
786 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
787 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
789 long replicatedToAllIndex = 1;
790 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
792 verify(mockRaftActor.delegate).createSnapshot();
794 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
796 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
798 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
800 verify(dataPersistenceProvider).deleteMessages(100);
802 assertEquals(3, mockRaftActor.getReplicatedLog().size());
803 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
805 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
806 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
807 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
809 // Index 2 will not be in the log because it was removed due to snapshotting
810 assertNull(mockRaftActor.getReplicatedLog().get(1));
811 assertNull(mockRaftActor.getReplicatedLog().get(0));
818 public void testApplyState() throws Exception {
820 new JavaTestKit(getSystem()) {
822 String persistenceId = factory.generateActorId("leader-");
824 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
826 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
828 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
830 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
831 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
833 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
835 mockRaftActor.waitForInitializeBehaviorComplete();
837 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
838 new MockRaftActorContext.MockPayload("F"));
840 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
842 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
849 public void testApplySnapshot() throws Exception {
850 new JavaTestKit(getSystem()) {
852 String persistenceId = factory.generateActorId("leader-");
854 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
856 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
858 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
860 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
861 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
863 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
865 mockRaftActor.waitForInitializeBehaviorComplete();
867 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
869 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
870 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
871 oldReplicatedLog.append(
872 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
873 mock(Payload.class)));
875 ByteString snapshotBytes = fromObject(Arrays.asList(
876 new MockRaftActorContext.MockPayload("A"),
877 new MockRaftActorContext.MockPayload("B"),
878 new MockRaftActorContext.MockPayload("C"),
879 new MockRaftActorContext.MockPayload("D")));
881 Snapshot snapshot = mock(Snapshot.class);
883 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
885 doReturn(3L).when(snapshot).getLastAppliedIndex();
887 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
889 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
891 assertTrue("The replicatedLog should have changed",
892 oldReplicatedLog != mockRaftActor.getReplicatedLog());
894 assertEquals("lastApplied should be same as in the snapshot",
895 (Long) 3L, mockRaftActor.getLastApplied());
897 assertEquals(0, mockRaftActor.getReplicatedLog().size());
904 public void testSaveSnapshotFailure() throws Exception {
905 new JavaTestKit(getSystem()) {
907 String persistenceId = factory.generateActorId("leader-");
909 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
911 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
913 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
915 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
916 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
918 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
920 mockRaftActor.waitForInitializeBehaviorComplete();
922 ByteString snapshotBytes = fromObject(Arrays.asList(
923 new MockRaftActorContext.MockPayload("A"),
924 new MockRaftActorContext.MockPayload("B"),
925 new MockRaftActorContext.MockPayload("C"),
926 new MockRaftActorContext.MockPayload("D")));
928 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
930 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
932 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
934 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
936 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
939 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
940 mockRaftActor.getReplicatedLog().getSnapshotIndex());
947 public void testRaftRoleChangeNotifier() throws Exception {
948 new JavaTestKit(getSystem()) {{
949 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
950 Props.create(MessageCollectorActor.class));
951 MessageCollectorActor.waitUntilReady(notifierActor);
953 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
954 long heartBeatInterval = 100;
955 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
956 config.setElectionTimeoutFactor(1);
958 String persistenceId = factory.generateActorId("notifier-");
960 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
961 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
963 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
965 // check if the notifier got a role change from null to Follower
966 RoleChanged raftRoleChanged = matches.get(0);
967 assertEquals(persistenceId, raftRoleChanged.getMemberId());
968 assertNull(raftRoleChanged.getOldRole());
969 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
971 // check if the notifier got a role change from Follower to Candidate
972 raftRoleChanged = matches.get(1);
973 assertEquals(persistenceId, raftRoleChanged.getMemberId());
974 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
975 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
977 // check if the notifier got a role change from Candidate to Leader
978 raftRoleChanged = matches.get(2);
979 assertEquals(persistenceId, raftRoleChanged.getMemberId());
980 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
981 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
983 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
984 notifierActor, LeaderStateChanged.class);
986 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
988 notifierActor.underlyingActor().clear();
990 MockRaftActor raftActor = raftActorRef.underlyingActor();
991 final String newLeaderId = "new-leader";
992 Follower follower = new Follower(raftActor.getRaftActorContext()) {
994 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
995 leaderId = newLeaderId;
1000 raftActor.changeCurrentBehavior(follower);
1002 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1003 assertEquals(persistenceId, leaderStateChange.getMemberId());
1004 assertEquals(null, leaderStateChange.getLeaderId());
1006 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1007 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1008 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1010 notifierActor.underlyingActor().clear();
1012 raftActor.handleCommand("any");
1014 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1015 assertEquals(persistenceId, leaderStateChange.getMemberId());
1016 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1021 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1022 new JavaTestKit(getSystem()) {
1024 String persistenceId = factory.generateActorId("leader-");
1025 String follower1Id = factory.generateActorId("follower-");
1027 ActorRef followerActor1 =
1028 factory.createActor(Props.create(MessageCollectorActor.class));
1030 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1031 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1032 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1034 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1036 Map<String, String> peerAddresses = new HashMap<>();
1037 peerAddresses.put(follower1Id, followerActor1.path().toString());
1039 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1040 MockRaftActor.props(persistenceId, peerAddresses,
1041 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1043 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1045 leaderActor.getRaftActorContext().setCommitIndex(4);
1046 leaderActor.getRaftActorContext().setLastApplied(4);
1047 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1049 leaderActor.waitForInitializeBehaviorComplete();
1051 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1053 Leader leader = new Leader(leaderActor.getRaftActorContext());
1054 leaderActor.setCurrentBehavior(leader);
1055 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1057 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1058 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1060 assertEquals(8, leaderActor.getReplicatedLog().size());
1062 leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
1064 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1065 verify(leaderActor.delegate).createSnapshot();
1067 assertEquals(8, leaderActor.getReplicatedLog().size());
1069 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1070 //fake snapshot on index 5
1071 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1073 assertEquals(8, leaderActor.getReplicatedLog().size());
1075 //fake snapshot on index 6
1076 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1077 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1078 assertEquals(8, leaderActor.getReplicatedLog().size());
1080 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1082 assertEquals(8, leaderActor.getReplicatedLog().size());
1084 ByteString snapshotBytes = fromObject(Arrays.asList(
1085 new MockRaftActorContext.MockPayload("foo-0"),
1086 new MockRaftActorContext.MockPayload("foo-1"),
1087 new MockRaftActorContext.MockPayload("foo-2"),
1088 new MockRaftActorContext.MockPayload("foo-3"),
1089 new MockRaftActorContext.MockPayload("foo-4")));
1090 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1091 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1093 // capture snapshot reply should remove the snapshotted entries only
1094 assertEquals(3, leaderActor.getReplicatedLog().size());
1095 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1097 // add another non-replicated entry
1098 leaderActor.getReplicatedLog().append(
1099 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1101 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1102 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1103 assertEquals(2, leaderActor.getReplicatedLog().size());
1104 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1111 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1112 new JavaTestKit(getSystem()) {
1114 String persistenceId = factory.generateActorId("follower-");
1115 String leaderId = factory.generateActorId("leader-");
1118 ActorRef leaderActor1 =
1119 factory.createActor(Props.create(MessageCollectorActor.class));
1121 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1122 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1123 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1125 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1127 Map<String, String> peerAddresses = new HashMap<>();
1128 peerAddresses.put(leaderId, leaderActor1.path().toString());
1130 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1131 MockRaftActor.props(persistenceId, peerAddresses,
1132 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1134 MockRaftActor followerActor = mockActorRef.underlyingActor();
1135 followerActor.getRaftActorContext().setCommitIndex(4);
1136 followerActor.getRaftActorContext().setLastApplied(4);
1137 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1139 followerActor.waitForInitializeBehaviorComplete();
1142 Follower follower = new Follower(followerActor.getRaftActorContext());
1143 followerActor.setCurrentBehavior(follower);
1144 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1146 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1147 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1148 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1150 // log has indices 0-5
1151 assertEquals(6, followerActor.getReplicatedLog().size());
1154 followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
1156 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1157 verify(followerActor.delegate).createSnapshot();
1159 assertEquals(6, followerActor.getReplicatedLog().size());
1161 //fake snapshot on index 6
1162 List<ReplicatedLogEntry> entries =
1164 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1165 new MockRaftActorContext.MockPayload("foo-6"))
1167 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1168 assertEquals(7, followerActor.getReplicatedLog().size());
1170 //fake snapshot on index 7
1171 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1175 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1176 new MockRaftActorContext.MockPayload("foo-7"))
1178 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1179 assertEquals(8, followerActor.getReplicatedLog().size());
1181 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1184 ByteString snapshotBytes = fromObject(Arrays.asList(
1185 new MockRaftActorContext.MockPayload("foo-0"),
1186 new MockRaftActorContext.MockPayload("foo-1"),
1187 new MockRaftActorContext.MockPayload("foo-2"),
1188 new MockRaftActorContext.MockPayload("foo-3"),
1189 new MockRaftActorContext.MockPayload("foo-4")));
1190 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1191 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1193 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1194 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1195 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1199 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1200 new MockRaftActorContext.MockPayload("foo-7"))
1202 // send an additional entry 8 with leaderCommit = 7
1203 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1205 // 7 and 8, as lastapplied is 7
1206 assertEquals(2, followerActor.getReplicatedLog().size());
1213 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1214 new JavaTestKit(getSystem()) {
1216 String persistenceId = factory.generateActorId("leader-");
1217 String follower1Id = factory.generateActorId("follower-");
1218 String follower2Id = factory.generateActorId("follower-");
1220 ActorRef followerActor1 =
1221 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1222 ActorRef followerActor2 =
1223 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1225 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1226 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1227 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1229 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1231 Map<String, String> peerAddresses = new HashMap<>();
1232 peerAddresses.put(follower1Id, followerActor1.path().toString());
1233 peerAddresses.put(follower2Id, followerActor2.path().toString());
1235 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1236 MockRaftActor.props(persistenceId, peerAddresses,
1237 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1239 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1240 leaderActor.getRaftActorContext().setCommitIndex(9);
1241 leaderActor.getRaftActorContext().setLastApplied(9);
1242 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1244 leaderActor.waitForInitializeBehaviorComplete();
1246 Leader leader = new Leader(leaderActor.getRaftActorContext());
1247 leaderActor.setCurrentBehavior(leader);
1248 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1250 // create 5 entries in the log
1251 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1252 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1254 //set the snapshot index to 4 , 0 to 4 are snapshotted
1255 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1256 //setting replicatedToAllIndex = 9, for the log to clear
1257 leader.setReplicatedToAllIndex(9);
1258 assertEquals(5, leaderActor.getReplicatedLog().size());
1259 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1261 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1262 assertEquals(5, leaderActor.getReplicatedLog().size());
1263 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1265 // set the 2nd follower nextIndex to 1 which has been snapshotted
1266 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1267 assertEquals(5, leaderActor.getReplicatedLog().size());
1268 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1270 // simulate a real snapshot
1271 leaderActor.onReceiveCommand(new SendHeartBeat());
1272 assertEquals(5, leaderActor.getReplicatedLog().size());
1273 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1274 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1275 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1278 //reply from a slow follower does not initiate a fake snapshot
1279 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1280 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1282 ByteString snapshotBytes = fromObject(Arrays.asList(
1283 new MockRaftActorContext.MockPayload("foo-0"),
1284 new MockRaftActorContext.MockPayload("foo-1"),
1285 new MockRaftActorContext.MockPayload("foo-2"),
1286 new MockRaftActorContext.MockPayload("foo-3"),
1287 new MockRaftActorContext.MockPayload("foo-4")));
1288 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1289 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1291 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1293 //reply from a slow follower after should not raise errors
1294 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1295 assertEquals(0, leaderActor.getReplicatedLog().size());
1301 private static class NonPersistentProvider implements DataPersistenceProvider {
1303 public boolean isRecoveryApplicable() {
1308 public <T> void persist(T o, Procedure<T> procedure) {
1311 } catch (Exception e) {
1312 e.printStackTrace();
1317 public void saveSnapshot(Object o) {
1322 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1327 public void deleteMessages(long sequenceNumber) {
1333 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1334 new JavaTestKit(getSystem()) {{
1335 String persistenceId = factory.generateActorId("leader-");
1336 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1337 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1338 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1339 config.setSnapshotBatchCount(5);
1341 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1343 Map<String, String> peerAddresses = new HashMap<>();
1345 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1346 MockRaftActor.props(persistenceId, peerAddresses,
1347 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1349 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1350 leaderActor.getRaftActorContext().setCommitIndex(3);
1351 leaderActor.getRaftActorContext().setLastApplied(3);
1352 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1354 leaderActor.waitForInitializeBehaviorComplete();
1355 for(int i=0;i< 4;i++) {
1356 leaderActor.getReplicatedLog()
1357 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1358 new MockRaftActorContext.MockPayload("A")));
1361 Leader leader = new Leader(leaderActor.getRaftActorContext());
1362 leaderActor.setCurrentBehavior(leader);
1363 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1365 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1366 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1368 // Now send a CaptureSnapshotReply
1369 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1371 // Trimming log in this scenario is a no-op
1372 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1373 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1374 assertEquals(-1, leader.getReplicatedToAllIndex());
1380 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1381 new JavaTestKit(getSystem()) {{
1382 String persistenceId = factory.generateActorId("leader-");
1383 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1384 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1386 config.setSnapshotBatchCount(5);
1388 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1390 Map<String, String> peerAddresses = new HashMap<>();
1392 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1393 MockRaftActor.props(persistenceId, peerAddresses,
1394 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1396 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1397 leaderActor.getRaftActorContext().setCommitIndex(3);
1398 leaderActor.getRaftActorContext().setLastApplied(3);
1399 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1400 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1402 leaderActor.waitForInitializeBehaviorComplete();
1403 Leader leader = new Leader(leaderActor.getRaftActorContext());
1404 leaderActor.setCurrentBehavior(leader);
1405 leader.setReplicatedToAllIndex(3);
1406 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1408 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1409 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1411 // Now send a CaptureSnapshotReply
1412 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1414 // Trimming log in this scenario is a no-op
1415 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1416 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1417 assertEquals(3, leader.getReplicatedToAllIndex());
1422 private ByteString fromObject(Object snapshot) throws Exception {
1423 ByteArrayOutputStream b = null;
1424 ObjectOutputStream o = null;
1426 b = new ByteArrayOutputStream();
1427 o = new ObjectOutputStream(b);
1428 o.writeObject(snapshot);
1429 byte[] snapshotBytes = b.toByteArray();
1430 return ByteString.copyFrom(snapshotBytes);