1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import org.junit.After;
54 import org.junit.Assert;
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.opendaylight.controller.cluster.DataPersistenceProvider;
58 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
59 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
60 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
61 import org.opendaylight.controller.cluster.notifications.RoleChanged;
62 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
65 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
66 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
67 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
68 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
69 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
70 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
71 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
72 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
73 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
74 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
75 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
76 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
77 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
78 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
79 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
80 import scala.concurrent.Await;
81 import scala.concurrent.Future;
82 import scala.concurrent.duration.Duration;
83 import scala.concurrent.duration.FiniteDuration;
85 public class RaftActorTest extends AbstractActorTest {
87 private TestActorFactory factory;
91 factory = new TestActorFactory(getSystem());
95 public void tearDown() throws Exception {
97 InMemoryJournal.clear();
98 InMemorySnapshotStore.clear();
101 public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
103 private final RaftActor actorDelegate;
104 private final RaftActorRecoveryCohort recoveryCohortDelegate;
105 private final RaftActorSnapshotCohort snapshotCohortDelegate;
106 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
107 private final List<Object> state;
108 private ActorRef roleChangeNotifier;
109 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
111 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
112 private static final long serialVersionUID = 1L;
113 private final Map<String, String> peerAddresses;
114 private final String id;
115 private final Optional<ConfigParams> config;
116 private final DataPersistenceProvider dataPersistenceProvider;
117 private final ActorRef roleChangeNotifier;
119 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
120 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
121 ActorRef roleChangeNotifier) {
122 this.peerAddresses = peerAddresses;
124 this.config = config;
125 this.dataPersistenceProvider = dataPersistenceProvider;
126 this.roleChangeNotifier = roleChangeNotifier;
130 public MockRaftActor create() throws Exception {
131 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
132 dataPersistenceProvider);
133 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
134 return mockRaftActor;
138 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
139 DataPersistenceProvider dataPersistenceProvider) {
140 super(id, peerAddresses, config);
141 state = new ArrayList<>();
142 this.actorDelegate = mock(RaftActor.class);
143 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
144 this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
145 if(dataPersistenceProvider == null){
146 setPersistence(true);
148 setPersistence(dataPersistenceProvider);
152 public void waitForRecoveryComplete() {
154 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
155 } catch (InterruptedException e) {
160 public void waitForInitializeBehaviorComplete() {
162 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
163 } catch (InterruptedException e) {
169 public void waitUntilLeader(){
170 for(int i = 0;i < 10; i++){
174 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
178 public List<Object> getState() {
182 public static Props props(final String id, final Map<String, String> peerAddresses,
183 Optional<ConfigParams> config){
184 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
187 public static Props props(final String id, final Map<String, String> peerAddresses,
188 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
189 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
192 public static Props props(final String id, final Map<String, String> peerAddresses,
193 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
194 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
197 public static Props props(final String id, final Map<String, String> peerAddresses,
198 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
199 DataPersistenceProvider dataPersistenceProvider){
200 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
204 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
205 actorDelegate.applyState(clientActor, identifier, data);
206 LOG.info("{}: applyState called", persistenceId());
211 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
216 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
221 public void startLogRecoveryBatch(int maxBatchSize) {
225 public void appendRecoveredLogEntry(Payload data) {
230 public void applyCurrentLogRecoveryBatch() {
234 protected void onRecoveryComplete() {
235 actorDelegate.onRecoveryComplete();
236 recoveryComplete.countDown();
240 protected void initializeBehavior() {
241 super.initializeBehavior();
242 initializeBehaviorComplete.countDown();
246 public void applyRecoverySnapshot(byte[] bytes) {
247 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
249 Object data = toObject(bytes);
250 if (data instanceof List) {
251 state.addAll((List<?>) data);
253 } catch (Exception e) {
259 public void createSnapshot(ActorRef actorRef) {
260 LOG.info("{}: createSnapshot called", persistenceId());
261 snapshotCohortDelegate.createSnapshot(actorRef);
265 public void applySnapshot(byte [] snapshot) {
266 LOG.info("{}: applySnapshot called", persistenceId());
267 snapshotCohortDelegate.applySnapshot(snapshot);
271 protected void onStateChanged() {
272 actorDelegate.onStateChanged();
276 protected Optional<ActorRef> getRoleChangeNotifier() {
277 return Optional.fromNullable(roleChangeNotifier);
280 @Override public String persistenceId() {
284 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
286 ByteArrayInputStream bis = null;
287 ObjectInputStream ois = null;
289 bis = new ByteArrayInputStream(bs);
290 ois = new ObjectInputStream(bis);
291 obj = ois.readObject();
303 public ReplicatedLog getReplicatedLog(){
304 return this.getRaftActorContext().getReplicatedLog();
309 public static class RaftActorTestKit extends JavaTestKit {
310 private final ActorRef raftActor;
312 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
315 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
316 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
321 public ActorRef getRaftActor() {
325 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
326 // Wait for a specific log message to show up
328 new JavaTestKit.EventFilter<Boolean>(logEventClass
331 protected Boolean run() {
334 }.from(raftActor.path().toString())
336 .occurrences(1).exec();
341 protected void waitUntilLeader(){
342 waitUntilLeader(raftActor);
345 public static void waitUntilLeader(ActorRef actorRef) {
346 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
347 for(int i = 0; i < 20 * 5; i++) {
348 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
350 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
351 if(resp.getLeaderActor() != null) {
354 } catch(TimeoutException e) {
355 } catch(Exception e) {
356 System.err.println("FindLeader threw ex");
361 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
364 Assert.fail("Leader not found for actorRef " + actorRef.path());
371 public void testConstruction() {
372 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
376 public void testFindLeaderWhenLeaderIsSelf(){
377 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
378 kit.waitUntilLeader();
382 public void testRaftActorRecovery() throws Exception {
383 new JavaTestKit(getSystem()) {{
384 String persistenceId = factory.generateActorId("follower-");
386 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
387 // Set the heartbeat interval high to essentially disable election otherwise the test
388 // may fail if the actor is switched to Leader and the commitIndex is set to the last
390 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
392 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
393 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
395 watch(followerActor);
397 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
398 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
399 new MockRaftActorContext.MockPayload("E"));
400 snapshotUnappliedEntries.add(entry1);
402 int lastAppliedDuringSnapshotCapture = 3;
403 int lastIndexDuringSnapshotCapture = 4;
405 // 4 messages as part of snapshot, which are applied to state
406 ByteString snapshotBytes = fromObject(Arrays.asList(
407 new MockRaftActorContext.MockPayload("A"),
408 new MockRaftActorContext.MockPayload("B"),
409 new MockRaftActorContext.MockPayload("C"),
410 new MockRaftActorContext.MockPayload("D")));
412 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
413 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
414 lastAppliedDuringSnapshotCapture, 1);
415 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
417 // add more entries after snapshot is taken
418 List<ReplicatedLogEntry> entries = new ArrayList<>();
419 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
420 new MockRaftActorContext.MockPayload("F", 2));
421 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
422 new MockRaftActorContext.MockPayload("G", 3));
423 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
424 new MockRaftActorContext.MockPayload("H", 4));
429 int lastAppliedToState = 5;
432 InMemoryJournal.addEntry(persistenceId, 5, entry2);
433 // 2 entries are applied to state besides the 4 entries in snapshot
434 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
435 InMemoryJournal.addEntry(persistenceId, 7, entry3);
436 InMemoryJournal.addEntry(persistenceId, 8, entry4);
439 followerActor.tell(PoisonPill.getInstance(), null);
440 expectMsgClass(duration("5 seconds"), Terminated.class);
442 unwatch(followerActor);
444 //reinstate the actor
445 TestActorRef<MockRaftActor> ref = factory.createTestActor(
446 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
447 Optional.<ConfigParams>of(config)));
449 ref.underlyingActor().waitForRecoveryComplete();
451 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
452 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
453 context.getReplicatedLog().size());
454 assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
455 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
456 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
457 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
458 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
463 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
464 new JavaTestKit(getSystem()) {{
465 String persistenceId = factory.generateActorId("leader-");
467 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
468 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
470 // Setup the persisted journal with some entries
471 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
472 new MockRaftActorContext.MockPayload("zero"));
473 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
474 new MockRaftActorContext.MockPayload("oen"));
475 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
476 new MockRaftActorContext.MockPayload("two"));
479 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
480 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
481 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
482 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
484 int lastAppliedToState = 1;
487 //reinstate the actor
488 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
489 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
490 Optional.<ConfigParams>of(config)));
492 leaderActor.underlyingActor().waitForRecoveryComplete();
494 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
495 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
496 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
497 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
498 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
503 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
504 * process recovery messages
510 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
511 new JavaTestKit(getSystem()) {
513 String persistenceId = factory.generateActorId("leader-");
515 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
517 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
519 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
520 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
522 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
524 // Wait for akka's recovery to complete so it doesn't interfere.
525 mockRaftActor.waitForRecoveryComplete();
527 ByteString snapshotBytes = fromObject(Arrays.asList(
528 new MockRaftActorContext.MockPayload("A"),
529 new MockRaftActorContext.MockPayload("B"),
530 new MockRaftActorContext.MockPayload("C"),
531 new MockRaftActorContext.MockPayload("D")));
533 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
534 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
536 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
538 verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
540 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
542 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
544 assertEquals("add replicated log entry", 1, replicatedLog.size());
546 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
548 assertEquals("add replicated log entry", 2, replicatedLog.size());
550 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
552 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
554 // The snapshot had 4 items + we added 2 more items during the test
555 // We start removing from 5 and we should get 1 item in the replicated log
556 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
558 assertEquals("remove log entries", 1, replicatedLog.size());
560 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
562 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
563 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
565 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
571 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
572 * not process recovery messages
577 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
578 new JavaTestKit(getSystem()) {
580 String persistenceId = factory.generateActorId("leader-");
582 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
584 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
586 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
587 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
589 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
591 // Wait for akka's recovery to complete so it doesn't interfere.
592 mockRaftActor.waitForRecoveryComplete();
594 ByteString snapshotBytes = fromObject(Arrays.asList(
595 new MockRaftActorContext.MockPayload("A"),
596 new MockRaftActorContext.MockPayload("B"),
597 new MockRaftActorContext.MockPayload("C"),
598 new MockRaftActorContext.MockPayload("D")));
600 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
601 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
603 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
605 verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
607 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
609 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
611 assertEquals("add replicated log entry", 0, replicatedLog.size());
613 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
615 assertEquals("add replicated log entry", 0, replicatedLog.size());
617 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
619 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
621 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
623 assertEquals("remove log entries", 0, replicatedLog.size());
625 mockRaftActor.onReceiveRecover(new UpdateElectionTerm(10, "foobar"));
627 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
628 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
630 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
636 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
637 new JavaTestKit(getSystem()) {
639 String persistenceId = factory.generateActorId("leader-");
641 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
643 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
645 CountDownLatch persistLatch = new CountDownLatch(1);
646 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
647 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
649 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
650 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
652 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
654 mockRaftActor.waitForInitializeBehaviorComplete();
656 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
658 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
664 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
665 new JavaTestKit(getSystem()) {
667 String persistenceId = factory.generateActorId("leader-");
669 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
671 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
673 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
675 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
676 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
678 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
680 mockRaftActor.waitForInitializeBehaviorComplete();
682 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
684 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
686 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
692 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
693 new JavaTestKit(getSystem()) {
695 String persistenceId = factory.generateActorId("leader-");
697 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
699 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
701 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
703 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
704 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
706 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
708 mockRaftActor.waitForInitializeBehaviorComplete();
710 mockRaftActor.waitUntilLeader();
712 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
714 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
716 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
722 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
723 new JavaTestKit(getSystem()) {
725 String persistenceId = factory.generateActorId("leader-");
727 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
729 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
731 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
733 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
734 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
736 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
738 mockRaftActor.waitForInitializeBehaviorComplete();
740 mockRaftActor.waitUntilLeader();
742 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
744 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
752 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
753 new JavaTestKit(getSystem()) {
755 String persistenceId = factory.generateActorId("leader-");
757 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
759 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
761 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
763 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
764 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
765 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
767 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
769 mockRaftActor.waitForInitializeBehaviorComplete();
771 ByteString snapshotBytes = fromObject(Arrays.asList(
772 new MockRaftActorContext.MockPayload("A"),
773 new MockRaftActorContext.MockPayload("B"),
774 new MockRaftActorContext.MockPayload("C"),
775 new MockRaftActorContext.MockPayload("D")));
777 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
779 raftActorContext.getSnapshotManager().capture(
780 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
781 new MockRaftActorContext.MockPayload("D")), -1);
783 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
785 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
787 verify(dataPersistenceProvider).saveSnapshot(anyObject());
794 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
795 new JavaTestKit(getSystem()) {
797 String persistenceId = factory.generateActorId("leader-");
799 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
801 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
803 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
805 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
806 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
808 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
810 mockRaftActor.waitForInitializeBehaviorComplete();
811 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
813 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
814 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
815 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
816 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
817 mockRaftActor.getReplicatedLog().append(lastEntry);
819 ByteString snapshotBytes = fromObject(Arrays.asList(
820 new MockRaftActorContext.MockPayload("A"),
821 new MockRaftActorContext.MockPayload("B"),
822 new MockRaftActorContext.MockPayload("C"),
823 new MockRaftActorContext.MockPayload("D")));
825 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
826 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
828 long replicatedToAllIndex = 1;
830 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
832 verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
834 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
836 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
838 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
840 verify(dataPersistenceProvider).deleteMessages(100);
842 assertEquals(3, mockRaftActor.getReplicatedLog().size());
843 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
845 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
846 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
847 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
849 // Index 2 will not be in the log because it was removed due to snapshotting
850 assertNull(mockRaftActor.getReplicatedLog().get(1));
851 assertNull(mockRaftActor.getReplicatedLog().get(0));
858 public void testApplyState() throws Exception {
860 new JavaTestKit(getSystem()) {
862 String persistenceId = factory.generateActorId("leader-");
864 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
866 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
868 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
870 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
871 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
873 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
875 mockRaftActor.waitForInitializeBehaviorComplete();
877 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
878 new MockRaftActorContext.MockPayload("F"));
880 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
882 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
889 public void testApplySnapshot() throws Exception {
890 new JavaTestKit(getSystem()) {
892 String persistenceId = factory.generateActorId("leader-");
894 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
896 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
898 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
900 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
901 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
903 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
905 mockRaftActor.waitForInitializeBehaviorComplete();
907 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
909 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
910 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
911 oldReplicatedLog.append(
912 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
913 mock(Payload.class)));
915 ByteString snapshotBytes = fromObject(Arrays.asList(
916 new MockRaftActorContext.MockPayload("A"),
917 new MockRaftActorContext.MockPayload("B"),
918 new MockRaftActorContext.MockPayload("C"),
919 new MockRaftActorContext.MockPayload("D")));
921 Snapshot snapshot = mock(Snapshot.class);
923 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
925 doReturn(3L).when(snapshot).getLastAppliedIndex();
927 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
929 verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
931 assertTrue("The replicatedLog should have changed",
932 oldReplicatedLog != mockRaftActor.getReplicatedLog());
934 assertEquals("lastApplied should be same as in the snapshot",
935 (Long) 3L, mockRaftActor.getLastApplied());
937 assertEquals(0, mockRaftActor.getReplicatedLog().size());
944 public void testSaveSnapshotFailure() throws Exception {
945 new JavaTestKit(getSystem()) {
947 String persistenceId = factory.generateActorId("leader-");
949 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
951 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
953 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
955 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
956 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
958 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
960 mockRaftActor.waitForInitializeBehaviorComplete();
962 ByteString snapshotBytes = fromObject(Arrays.asList(
963 new MockRaftActorContext.MockPayload("A"),
964 new MockRaftActorContext.MockPayload("B"),
965 new MockRaftActorContext.MockPayload("C"),
966 new MockRaftActorContext.MockPayload("D")));
968 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
970 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
972 raftActorContext.getSnapshotManager().capture(
973 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
974 new MockRaftActorContext.MockPayload("D")), 1);
976 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
978 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
981 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
982 mockRaftActor.getReplicatedLog().getSnapshotIndex());
989 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
990 new JavaTestKit(getSystem()) {{
991 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
992 Props.create(MessageCollectorActor.class));
993 MessageCollectorActor.waitUntilReady(notifierActor);
995 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
996 long heartBeatInterval = 100;
997 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
998 config.setElectionTimeoutFactor(20);
1000 String persistenceId = factory.generateActorId("notifier-");
1002 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
1003 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
1004 new NonPersistentDataProvider()), persistenceId);
1006 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
1009 // check if the notifier got a role change from null to Follower
1010 RoleChanged raftRoleChanged = matches.get(0);
1011 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1012 assertNull(raftRoleChanged.getOldRole());
1013 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1015 // check if the notifier got a role change from Follower to Candidate
1016 raftRoleChanged = matches.get(1);
1017 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1018 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1019 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1021 // check if the notifier got a role change from Candidate to Leader
1022 raftRoleChanged = matches.get(2);
1023 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1024 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1025 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1027 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1028 notifierActor, LeaderStateChanged.class);
1030 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1032 notifierActor.underlyingActor().clear();
1034 MockRaftActor raftActor = raftActorRef.underlyingActor();
1035 final String newLeaderId = "new-leader";
1036 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1038 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1039 leaderId = newLeaderId;
1044 raftActor.changeCurrentBehavior(follower);
1046 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1047 assertEquals(persistenceId, leaderStateChange.getMemberId());
1048 assertEquals(null, leaderStateChange.getLeaderId());
1050 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1051 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1052 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1054 notifierActor.underlyingActor().clear();
1056 raftActor.handleCommand("any");
1058 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1059 assertEquals(persistenceId, leaderStateChange.getMemberId());
1060 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1065 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1066 new JavaTestKit(getSystem()) {{
1067 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1068 MessageCollectorActor.waitUntilReady(notifierActor);
1070 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1071 long heartBeatInterval = 100;
1072 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1073 config.setElectionTimeoutFactor(1);
1075 String persistenceId = factory.generateActorId("notifier-");
1077 factory.createActor(MockRaftActor.props(persistenceId,
1078 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1080 List<RoleChanged> matches = null;
1081 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1082 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1083 assertNotNull(matches);
1084 if(matches.size() == 3) {
1087 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1090 assertEquals(2, matches.size());
1092 // check if the notifier got a role change from null to Follower
1093 RoleChanged raftRoleChanged = matches.get(0);
1094 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1095 assertNull(raftRoleChanged.getOldRole());
1096 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1098 // check if the notifier got a role change from Follower to Candidate
1099 raftRoleChanged = matches.get(1);
1100 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1101 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1102 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1108 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1109 new JavaTestKit(getSystem()) {
1111 String persistenceId = factory.generateActorId("leader-");
1112 String follower1Id = factory.generateActorId("follower-");
1114 ActorRef followerActor1 =
1115 factory.createActor(Props.create(MessageCollectorActor.class));
1117 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1118 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1119 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1121 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1123 Map<String, String> peerAddresses = new HashMap<>();
1124 peerAddresses.put(follower1Id, followerActor1.path().toString());
1126 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1127 MockRaftActor.props(persistenceId, peerAddresses,
1128 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1130 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1132 leaderActor.getRaftActorContext().setCommitIndex(4);
1133 leaderActor.getRaftActorContext().setLastApplied(4);
1134 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1136 leaderActor.waitForInitializeBehaviorComplete();
1138 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1140 Leader leader = new Leader(leaderActor.getRaftActorContext());
1141 leaderActor.setCurrentBehavior(leader);
1142 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1144 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1145 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1147 assertEquals(8, leaderActor.getReplicatedLog().size());
1149 leaderActor.getRaftActorContext().getSnapshotManager()
1150 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1151 new MockRaftActorContext.MockPayload("x")), 4);
1153 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1155 assertEquals(8, leaderActor.getReplicatedLog().size());
1157 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1158 //fake snapshot on index 5
1159 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1161 assertEquals(8, leaderActor.getReplicatedLog().size());
1163 //fake snapshot on index 6
1164 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1165 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1166 assertEquals(8, leaderActor.getReplicatedLog().size());
1168 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1170 assertEquals(8, leaderActor.getReplicatedLog().size());
1172 ByteString snapshotBytes = fromObject(Arrays.asList(
1173 new MockRaftActorContext.MockPayload("foo-0"),
1174 new MockRaftActorContext.MockPayload("foo-1"),
1175 new MockRaftActorContext.MockPayload("foo-2"),
1176 new MockRaftActorContext.MockPayload("foo-3"),
1177 new MockRaftActorContext.MockPayload("foo-4")));
1179 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1180 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1182 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1184 // The commit is needed to complete the snapshot creation process
1185 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1187 // capture snapshot reply should remove the snapshotted entries only
1188 assertEquals(3, leaderActor.getReplicatedLog().size());
1189 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1191 // add another non-replicated entry
1192 leaderActor.getReplicatedLog().append(
1193 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1195 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1196 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1197 assertEquals(2, leaderActor.getReplicatedLog().size());
1198 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1205 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1206 new JavaTestKit(getSystem()) {
1208 String persistenceId = factory.generateActorId("follower-");
1209 String leaderId = factory.generateActorId("leader-");
1212 ActorRef leaderActor1 =
1213 factory.createActor(Props.create(MessageCollectorActor.class));
1215 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1216 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1217 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1219 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1221 Map<String, String> peerAddresses = new HashMap<>();
1222 peerAddresses.put(leaderId, leaderActor1.path().toString());
1224 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1225 MockRaftActor.props(persistenceId, peerAddresses,
1226 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1228 MockRaftActor followerActor = mockActorRef.underlyingActor();
1229 followerActor.getRaftActorContext().setCommitIndex(4);
1230 followerActor.getRaftActorContext().setLastApplied(4);
1231 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1233 followerActor.waitForInitializeBehaviorComplete();
1236 Follower follower = new Follower(followerActor.getRaftActorContext());
1237 followerActor.setCurrentBehavior(follower);
1238 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1240 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1241 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1242 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1244 // log has indices 0-5
1245 assertEquals(6, followerActor.getReplicatedLog().size());
1248 followerActor.getRaftActorContext().getSnapshotManager().capture(
1249 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1250 new MockRaftActorContext.MockPayload("D")), 4);
1252 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
1254 assertEquals(6, followerActor.getReplicatedLog().size());
1256 //fake snapshot on index 6
1257 List<ReplicatedLogEntry> entries =
1259 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1260 new MockRaftActorContext.MockPayload("foo-6"))
1262 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1263 assertEquals(7, followerActor.getReplicatedLog().size());
1265 //fake snapshot on index 7
1266 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1270 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1271 new MockRaftActorContext.MockPayload("foo-7"))
1273 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1274 assertEquals(8, followerActor.getReplicatedLog().size());
1276 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1279 ByteString snapshotBytes = fromObject(Arrays.asList(
1280 new MockRaftActorContext.MockPayload("foo-0"),
1281 new MockRaftActorContext.MockPayload("foo-1"),
1282 new MockRaftActorContext.MockPayload("foo-2"),
1283 new MockRaftActorContext.MockPayload("foo-3"),
1284 new MockRaftActorContext.MockPayload("foo-4")));
1285 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1286 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1288 // The commit is needed to complete the snapshot creation process
1289 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1291 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1292 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1293 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1297 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1298 new MockRaftActorContext.MockPayload("foo-7"))
1300 // send an additional entry 8 with leaderCommit = 7
1301 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1303 // 7 and 8, as lastapplied is 7
1304 assertEquals(2, followerActor.getReplicatedLog().size());
1311 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1312 new JavaTestKit(getSystem()) {
1314 String persistenceId = factory.generateActorId("leader-");
1315 String follower1Id = factory.generateActorId("follower-");
1316 String follower2Id = factory.generateActorId("follower-");
1318 ActorRef followerActor1 =
1319 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1320 ActorRef followerActor2 =
1321 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1323 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1324 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1325 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1327 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1329 Map<String, String> peerAddresses = new HashMap<>();
1330 peerAddresses.put(follower1Id, followerActor1.path().toString());
1331 peerAddresses.put(follower2Id, followerActor2.path().toString());
1333 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1334 MockRaftActor.props(persistenceId, peerAddresses,
1335 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1337 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1338 leaderActor.getRaftActorContext().setCommitIndex(9);
1339 leaderActor.getRaftActorContext().setLastApplied(9);
1340 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1342 leaderActor.waitForInitializeBehaviorComplete();
1344 Leader leader = new Leader(leaderActor.getRaftActorContext());
1345 leaderActor.setCurrentBehavior(leader);
1346 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1348 // create 5 entries in the log
1349 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1350 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1352 //set the snapshot index to 4 , 0 to 4 are snapshotted
1353 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1354 //setting replicatedToAllIndex = 9, for the log to clear
1355 leader.setReplicatedToAllIndex(9);
1356 assertEquals(5, leaderActor.getReplicatedLog().size());
1357 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1359 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1360 assertEquals(5, leaderActor.getReplicatedLog().size());
1361 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1363 // set the 2nd follower nextIndex to 1 which has been snapshotted
1364 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1365 assertEquals(5, leaderActor.getReplicatedLog().size());
1366 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1368 // simulate a real snapshot
1369 leaderActor.onReceiveCommand(new SendHeartBeat());
1370 assertEquals(5, leaderActor.getReplicatedLog().size());
1371 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1372 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1373 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1376 //reply from a slow follower does not initiate a fake snapshot
1377 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1378 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1380 ByteString snapshotBytes = fromObject(Arrays.asList(
1381 new MockRaftActorContext.MockPayload("foo-0"),
1382 new MockRaftActorContext.MockPayload("foo-1"),
1383 new MockRaftActorContext.MockPayload("foo-2"),
1384 new MockRaftActorContext.MockPayload("foo-3"),
1385 new MockRaftActorContext.MockPayload("foo-4")));
1386 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1387 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1389 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1391 //reply from a slow follower after should not raise errors
1392 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1393 assertEquals(0, leaderActor.getReplicatedLog().size());
1399 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1400 new JavaTestKit(getSystem()) {{
1401 String persistenceId = factory.generateActorId("leader-");
1402 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1403 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1404 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1405 config.setSnapshotBatchCount(5);
1407 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1409 Map<String, String> peerAddresses = new HashMap<>();
1411 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1412 MockRaftActor.props(persistenceId, peerAddresses,
1413 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1415 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1416 leaderActor.getRaftActorContext().setCommitIndex(3);
1417 leaderActor.getRaftActorContext().setLastApplied(3);
1418 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1420 leaderActor.waitForInitializeBehaviorComplete();
1421 for(int i=0;i< 4;i++) {
1422 leaderActor.getReplicatedLog()
1423 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1424 new MockRaftActorContext.MockPayload("A")));
1427 Leader leader = new Leader(leaderActor.getRaftActorContext());
1428 leaderActor.setCurrentBehavior(leader);
1429 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1431 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1432 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1434 // Now send a CaptureSnapshotReply
1435 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1437 // Trimming log in this scenario is a no-op
1438 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1439 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1440 assertEquals(-1, leader.getReplicatedToAllIndex());
1446 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1447 new JavaTestKit(getSystem()) {{
1448 String persistenceId = factory.generateActorId("leader-");
1449 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1450 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1451 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1452 config.setSnapshotBatchCount(5);
1454 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1456 Map<String, String> peerAddresses = new HashMap<>();
1458 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1459 MockRaftActor.props(persistenceId, peerAddresses,
1460 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1462 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1463 leaderActor.getRaftActorContext().setCommitIndex(3);
1464 leaderActor.getRaftActorContext().setLastApplied(3);
1465 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1466 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1468 leaderActor.waitForInitializeBehaviorComplete();
1469 Leader leader = new Leader(leaderActor.getRaftActorContext());
1470 leaderActor.setCurrentBehavior(leader);
1471 leader.setReplicatedToAllIndex(3);
1472 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1474 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1475 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1477 // Now send a CaptureSnapshotReply
1478 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1480 // Trimming log in this scenario is a no-op
1481 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1482 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1483 assertEquals(3, leader.getReplicatedToAllIndex());
1488 private ByteString fromObject(Object snapshot) throws Exception {
1489 ByteArrayOutputStream b = null;
1490 ObjectOutputStream o = null;
1492 b = new ByteArrayOutputStream();
1493 o = new ObjectOutputStream(b);
1494 o.writeObject(snapshot);
1495 byte[] snapshotBytes = b.toByteArray();
1496 return ByteString.copyFrom(snapshotBytes);