1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
58 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
59 import org.opendaylight.controller.cluster.notifications.RoleChanged;
60 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
64 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
65 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
66 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
67 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
68 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
69 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
71 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
73 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
74 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
75 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
76 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
77 import scala.concurrent.Await;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.Duration;
80 import scala.concurrent.duration.FiniteDuration;
82 public class RaftActorTest extends AbstractActorTest {
84 private TestActorFactory factory;
88 factory = new TestActorFactory(getSystem());
92 public void tearDown() throws Exception {
94 InMemoryJournal.clear();
95 InMemorySnapshotStore.clear();
98 public static class MockRaftActor extends RaftActor {
100 protected DataPersistenceProvider dataPersistenceProvider;
101 private final RaftActor delegate;
102 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103 private final List<Object> state;
104 private ActorRef roleChangeNotifier;
105 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
107 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108 private static final long serialVersionUID = 1L;
109 private final Map<String, String> peerAddresses;
110 private final String id;
111 private final Optional<ConfigParams> config;
112 private final DataPersistenceProvider dataPersistenceProvider;
113 private final ActorRef roleChangeNotifier;
115 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117 ActorRef roleChangeNotifier) {
118 this.peerAddresses = peerAddresses;
120 this.config = config;
121 this.dataPersistenceProvider = dataPersistenceProvider;
122 this.roleChangeNotifier = roleChangeNotifier;
126 public MockRaftActor create() throws Exception {
127 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128 dataPersistenceProvider);
129 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130 return mockRaftActor;
134 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135 DataPersistenceProvider dataPersistenceProvider) {
136 super(id, peerAddresses, config);
137 state = new ArrayList<>();
138 this.delegate = mock(RaftActor.class);
139 if(dataPersistenceProvider == null){
140 this.dataPersistenceProvider = new PersistentDataProvider();
142 this.dataPersistenceProvider = dataPersistenceProvider;
146 public void waitForRecoveryComplete() {
148 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
149 } catch (InterruptedException e) {
154 public void waitForInitializeBehaviorComplete() {
156 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
157 } catch (InterruptedException e) {
163 public void waitUntilLeader(){
164 for(int i = 0;i < 10; i++){
168 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
172 public List<Object> getState() {
176 public static Props props(final String id, final Map<String, String> peerAddresses,
177 Optional<ConfigParams> config){
178 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
181 public static Props props(final String id, final Map<String, String> peerAddresses,
182 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
183 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
186 public static Props props(final String id, final Map<String, String> peerAddresses,
187 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
188 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
191 public static Props props(final String id, final Map<String, String> peerAddresses,
192 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
193 DataPersistenceProvider dataPersistenceProvider){
194 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
198 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
199 delegate.applyState(clientActor, identifier, data);
200 LOG.info("{}: applyState called", persistenceId());
204 protected void startLogRecoveryBatch(int maxBatchSize) {
208 protected void appendRecoveredLogEntry(Payload data) {
213 protected void applyCurrentLogRecoveryBatch() {
217 protected void onRecoveryComplete() {
218 delegate.onRecoveryComplete();
219 recoveryComplete.countDown();
223 protected void initializeBehavior() {
224 super.initializeBehavior();
225 initializeBehaviorComplete.countDown();
229 protected void applyRecoverySnapshot(byte[] bytes) {
230 delegate.applyRecoverySnapshot(bytes);
232 Object data = toObject(bytes);
233 if (data instanceof List) {
234 state.addAll((List<?>) data);
236 } catch (Exception e) {
241 @Override protected void createSnapshot() {
242 LOG.info("{}: createSnapshot called", persistenceId());
243 delegate.createSnapshot();
246 @Override protected void applySnapshot(byte [] snapshot) {
247 LOG.info("{}: applySnapshot called", persistenceId());
248 delegate.applySnapshot(snapshot);
251 @Override protected void onStateChanged() {
252 delegate.onStateChanged();
256 protected DataPersistenceProvider persistence() {
257 return this.dataPersistenceProvider;
261 protected Optional<ActorRef> getRoleChangeNotifier() {
262 return Optional.fromNullable(roleChangeNotifier);
265 @Override public String persistenceId() {
269 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
271 ByteArrayInputStream bis = null;
272 ObjectInputStream ois = null;
274 bis = new ByteArrayInputStream(bs);
275 ois = new ObjectInputStream(bis);
276 obj = ois.readObject();
288 public ReplicatedLog getReplicatedLog(){
289 return this.getRaftActorContext().getReplicatedLog();
295 public static class RaftActorTestKit extends JavaTestKit {
296 private final ActorRef raftActor;
298 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
301 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
302 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
307 public ActorRef getRaftActor() {
311 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
312 // Wait for a specific log message to show up
314 new JavaTestKit.EventFilter<Boolean>(logEventClass
317 protected Boolean run() {
320 }.from(raftActor.path().toString())
322 .occurrences(1).exec();
327 protected void waitUntilLeader(){
328 waitUntilLeader(raftActor);
331 public static void waitUntilLeader(ActorRef actorRef) {
332 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
333 for(int i = 0; i < 20 * 5; i++) {
334 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
336 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
337 if(resp.getLeaderActor() != null) {
340 } catch(TimeoutException e) {
341 } catch(Exception e) {
342 System.err.println("FindLeader threw ex");
347 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
350 Assert.fail("Leader not found for actorRef " + actorRef.path());
357 public void testConstruction() {
358 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
362 public void testFindLeaderWhenLeaderIsSelf(){
363 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
364 kit.waitUntilLeader();
368 public void testRaftActorRecovery() throws Exception {
369 new JavaTestKit(getSystem()) {{
370 String persistenceId = factory.generateActorId("follower-");
372 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
373 // Set the heartbeat interval high to essentially disable election otherwise the test
374 // may fail if the actor is switched to Leader and the commitIndex is set to the last
376 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
378 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
379 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
381 watch(followerActor);
383 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
384 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
385 new MockRaftActorContext.MockPayload("E"));
386 snapshotUnappliedEntries.add(entry1);
388 int lastAppliedDuringSnapshotCapture = 3;
389 int lastIndexDuringSnapshotCapture = 4;
391 // 4 messages as part of snapshot, which are applied to state
392 ByteString snapshotBytes = fromObject(Arrays.asList(
393 new MockRaftActorContext.MockPayload("A"),
394 new MockRaftActorContext.MockPayload("B"),
395 new MockRaftActorContext.MockPayload("C"),
396 new MockRaftActorContext.MockPayload("D")));
398 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
399 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
400 lastAppliedDuringSnapshotCapture, 1);
401 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
403 // add more entries after snapshot is taken
404 List<ReplicatedLogEntry> entries = new ArrayList<>();
405 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
406 new MockRaftActorContext.MockPayload("F"));
407 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
408 new MockRaftActorContext.MockPayload("G"));
409 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
410 new MockRaftActorContext.MockPayload("H"));
415 int lastAppliedToState = 5;
418 InMemoryJournal.addEntry(persistenceId, 5, entry2);
419 // 2 entries are applied to state besides the 4 entries in snapshot
420 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
421 InMemoryJournal.addEntry(persistenceId, 7, entry3);
422 InMemoryJournal.addEntry(persistenceId, 8, entry4);
425 followerActor.tell(PoisonPill.getInstance(), null);
426 expectMsgClass(duration("5 seconds"), Terminated.class);
428 unwatch(followerActor);
430 //reinstate the actor
431 TestActorRef<MockRaftActor> ref = factory.createTestActor(
432 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
433 Optional.<ConfigParams>of(config)));
435 ref.underlyingActor().waitForRecoveryComplete();
437 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
438 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
439 context.getReplicatedLog().size());
440 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
441 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
442 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
443 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
448 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
449 new JavaTestKit(getSystem()) {{
450 String persistenceId = factory.generateActorId("leader-");
452 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
453 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
455 // Setup the persisted journal with some entries
456 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
457 new MockRaftActorContext.MockPayload("zero"));
458 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
459 new MockRaftActorContext.MockPayload("oen"));
460 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
461 new MockRaftActorContext.MockPayload("two"));
464 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
465 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
466 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
467 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
469 int lastAppliedToState = 1;
472 //reinstate the actor
473 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
474 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
475 Optional.<ConfigParams>of(config)));
477 leaderActor.underlyingActor().waitForRecoveryComplete();
479 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
480 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
481 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
482 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
483 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
488 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
489 * process recovery messages
495 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
496 new JavaTestKit(getSystem()) {
498 String persistenceId = factory.generateActorId("leader-");
500 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
502 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
504 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
505 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
507 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
509 // Wait for akka's recovery to complete so it doesn't interfere.
510 mockRaftActor.waitForRecoveryComplete();
512 ByteString snapshotBytes = fromObject(Arrays.asList(
513 new MockRaftActorContext.MockPayload("A"),
514 new MockRaftActorContext.MockPayload("B"),
515 new MockRaftActorContext.MockPayload("C"),
516 new MockRaftActorContext.MockPayload("D")));
518 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
519 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
521 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
523 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
525 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
527 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
529 assertEquals("add replicated log entry", 1, replicatedLog.size());
531 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
533 assertEquals("add replicated log entry", 2, replicatedLog.size());
535 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
537 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
539 // The snapshot had 4 items + we added 2 more items during the test
540 // We start removing from 5 and we should get 1 item in the replicated log
541 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
543 assertEquals("remove log entries", 1, replicatedLog.size());
545 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
547 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
548 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
550 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
556 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
557 * not process recovery messages
562 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
563 new JavaTestKit(getSystem()) {
565 String persistenceId = factory.generateActorId("leader-");
567 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
569 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
571 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
572 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
574 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
576 // Wait for akka's recovery to complete so it doesn't interfere.
577 mockRaftActor.waitForRecoveryComplete();
579 ByteString snapshotBytes = fromObject(Arrays.asList(
580 new MockRaftActorContext.MockPayload("A"),
581 new MockRaftActorContext.MockPayload("B"),
582 new MockRaftActorContext.MockPayload("C"),
583 new MockRaftActorContext.MockPayload("D")));
585 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
586 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
588 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
590 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
592 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
594 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
596 assertEquals("add replicated log entry", 0, replicatedLog.size());
598 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
600 assertEquals("add replicated log entry", 0, replicatedLog.size());
602 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
604 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
606 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
608 assertEquals("remove log entries", 0, replicatedLog.size());
610 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
612 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
613 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
615 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
621 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
622 new JavaTestKit(getSystem()) {
624 String persistenceId = factory.generateActorId("leader-");
626 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
628 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630 CountDownLatch persistLatch = new CountDownLatch(1);
631 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
632 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
634 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
635 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
637 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
639 mockRaftActor.waitForInitializeBehaviorComplete();
641 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
643 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
649 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
650 new JavaTestKit(getSystem()) {
652 String persistenceId = factory.generateActorId("leader-");
654 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
656 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
658 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
660 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
661 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
663 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
665 mockRaftActor.waitForInitializeBehaviorComplete();
667 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
669 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
671 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
677 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
678 new JavaTestKit(getSystem()) {
680 String persistenceId = factory.generateActorId("leader-");
682 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
684 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
686 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
688 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
689 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
691 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
693 mockRaftActor.waitForInitializeBehaviorComplete();
695 mockRaftActor.waitUntilLeader();
697 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
699 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
701 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
707 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
708 new JavaTestKit(getSystem()) {
710 String persistenceId = factory.generateActorId("leader-");
712 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
714 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
716 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
718 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
719 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
721 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
723 mockRaftActor.waitForInitializeBehaviorComplete();
725 mockRaftActor.waitUntilLeader();
727 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
729 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
737 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
738 new JavaTestKit(getSystem()) {
740 String persistenceId = factory.generateActorId("leader-");
742 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
744 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
746 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
748 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
749 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
750 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
752 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
754 mockRaftActor.waitForInitializeBehaviorComplete();
756 ByteString snapshotBytes = fromObject(Arrays.asList(
757 new MockRaftActorContext.MockPayload("A"),
758 new MockRaftActorContext.MockPayload("B"),
759 new MockRaftActorContext.MockPayload("C"),
760 new MockRaftActorContext.MockPayload("D")));
762 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
764 raftActorContext.getSnapshotManager().capture(
765 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
766 new MockRaftActorContext.MockPayload("D")), -1);
768 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
770 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
772 verify(dataPersistenceProvider).saveSnapshot(anyObject());
779 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
780 new JavaTestKit(getSystem()) {
782 String persistenceId = factory.generateActorId("leader-");
784 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
786 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
788 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
790 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
791 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
793 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
795 mockRaftActor.waitForInitializeBehaviorComplete();
796 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
798 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
799 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
800 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
801 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
802 mockRaftActor.getReplicatedLog().append(lastEntry);
804 ByteString snapshotBytes = fromObject(Arrays.asList(
805 new MockRaftActorContext.MockPayload("A"),
806 new MockRaftActorContext.MockPayload("B"),
807 new MockRaftActorContext.MockPayload("C"),
808 new MockRaftActorContext.MockPayload("D")));
810 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
811 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
813 long replicatedToAllIndex = 1;
815 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
817 verify(mockRaftActor.delegate).createSnapshot();
819 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
821 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
823 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
825 verify(dataPersistenceProvider).deleteMessages(100);
827 assertEquals(3, mockRaftActor.getReplicatedLog().size());
828 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
830 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
831 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
832 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
834 // Index 2 will not be in the log because it was removed due to snapshotting
835 assertNull(mockRaftActor.getReplicatedLog().get(1));
836 assertNull(mockRaftActor.getReplicatedLog().get(0));
843 public void testApplyState() throws Exception {
845 new JavaTestKit(getSystem()) {
847 String persistenceId = factory.generateActorId("leader-");
849 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
851 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
853 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
855 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
856 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
858 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
860 mockRaftActor.waitForInitializeBehaviorComplete();
862 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
863 new MockRaftActorContext.MockPayload("F"));
865 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
867 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
874 public void testApplySnapshot() throws Exception {
875 new JavaTestKit(getSystem()) {
877 String persistenceId = factory.generateActorId("leader-");
879 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
881 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
883 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
885 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
886 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
888 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
890 mockRaftActor.waitForInitializeBehaviorComplete();
892 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
894 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
895 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
896 oldReplicatedLog.append(
897 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
898 mock(Payload.class)));
900 ByteString snapshotBytes = fromObject(Arrays.asList(
901 new MockRaftActorContext.MockPayload("A"),
902 new MockRaftActorContext.MockPayload("B"),
903 new MockRaftActorContext.MockPayload("C"),
904 new MockRaftActorContext.MockPayload("D")));
906 Snapshot snapshot = mock(Snapshot.class);
908 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
910 doReturn(3L).when(snapshot).getLastAppliedIndex();
912 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
914 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
916 assertTrue("The replicatedLog should have changed",
917 oldReplicatedLog != mockRaftActor.getReplicatedLog());
919 assertEquals("lastApplied should be same as in the snapshot",
920 (Long) 3L, mockRaftActor.getLastApplied());
922 assertEquals(0, mockRaftActor.getReplicatedLog().size());
929 public void testSaveSnapshotFailure() throws Exception {
930 new JavaTestKit(getSystem()) {
932 String persistenceId = factory.generateActorId("leader-");
934 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
936 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
938 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
940 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
941 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
943 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
945 mockRaftActor.waitForInitializeBehaviorComplete();
947 ByteString snapshotBytes = fromObject(Arrays.asList(
948 new MockRaftActorContext.MockPayload("A"),
949 new MockRaftActorContext.MockPayload("B"),
950 new MockRaftActorContext.MockPayload("C"),
951 new MockRaftActorContext.MockPayload("D")));
953 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
955 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
957 raftActorContext.getSnapshotManager().capture(
958 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
959 new MockRaftActorContext.MockPayload("D")), 1);
961 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
963 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
966 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
967 mockRaftActor.getReplicatedLog().getSnapshotIndex());
974 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
975 new JavaTestKit(getSystem()) {{
976 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
977 Props.create(MessageCollectorActor.class));
978 MessageCollectorActor.waitUntilReady(notifierActor);
980 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
981 long heartBeatInterval = 100;
982 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
983 config.setElectionTimeoutFactor(20);
985 String persistenceId = factory.generateActorId("notifier-");
987 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
988 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
989 new NonPersistentProvider()), persistenceId);
991 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
994 // check if the notifier got a role change from null to Follower
995 RoleChanged raftRoleChanged = matches.get(0);
996 assertEquals(persistenceId, raftRoleChanged.getMemberId());
997 assertNull(raftRoleChanged.getOldRole());
998 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1000 // check if the notifier got a role change from Follower to Candidate
1001 raftRoleChanged = matches.get(1);
1002 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1003 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1004 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1006 // check if the notifier got a role change from Candidate to Leader
1007 raftRoleChanged = matches.get(2);
1008 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1009 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1010 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1012 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1013 notifierActor, LeaderStateChanged.class);
1015 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1017 notifierActor.underlyingActor().clear();
1019 MockRaftActor raftActor = raftActorRef.underlyingActor();
1020 final String newLeaderId = "new-leader";
1021 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1023 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1024 leaderId = newLeaderId;
1029 raftActor.changeCurrentBehavior(follower);
1031 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1032 assertEquals(persistenceId, leaderStateChange.getMemberId());
1033 assertEquals(null, leaderStateChange.getLeaderId());
1035 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1036 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1037 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1039 notifierActor.underlyingActor().clear();
1041 raftActor.handleCommand("any");
1043 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1044 assertEquals(persistenceId, leaderStateChange.getMemberId());
1045 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1050 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1051 new JavaTestKit(getSystem()) {{
1052 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1053 MessageCollectorActor.waitUntilReady(notifierActor);
1055 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1056 long heartBeatInterval = 100;
1057 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1058 config.setElectionTimeoutFactor(1);
1060 String persistenceId = factory.generateActorId("notifier-");
1062 factory.createActor(MockRaftActor.props(persistenceId,
1063 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1065 List<RoleChanged> matches = null;
1066 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1067 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1068 assertNotNull(matches);
1069 if(matches.size() == 3) {
1072 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1075 assertEquals(2, matches.size());
1077 // check if the notifier got a role change from null to Follower
1078 RoleChanged raftRoleChanged = matches.get(0);
1079 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1080 assertNull(raftRoleChanged.getOldRole());
1081 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1083 // check if the notifier got a role change from Follower to Candidate
1084 raftRoleChanged = matches.get(1);
1085 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1086 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1087 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1093 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1094 new JavaTestKit(getSystem()) {
1096 String persistenceId = factory.generateActorId("leader-");
1097 String follower1Id = factory.generateActorId("follower-");
1099 ActorRef followerActor1 =
1100 factory.createActor(Props.create(MessageCollectorActor.class));
1102 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1103 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1104 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1106 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1108 Map<String, String> peerAddresses = new HashMap<>();
1109 peerAddresses.put(follower1Id, followerActor1.path().toString());
1111 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1112 MockRaftActor.props(persistenceId, peerAddresses,
1113 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1115 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1117 leaderActor.getRaftActorContext().setCommitIndex(4);
1118 leaderActor.getRaftActorContext().setLastApplied(4);
1119 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1121 leaderActor.waitForInitializeBehaviorComplete();
1123 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1125 Leader leader = new Leader(leaderActor.getRaftActorContext());
1126 leaderActor.setCurrentBehavior(leader);
1127 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1129 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1130 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1132 assertEquals(8, leaderActor.getReplicatedLog().size());
1134 leaderActor.getRaftActorContext().getSnapshotManager()
1135 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1136 new MockRaftActorContext.MockPayload("x")), 4);
1138 verify(leaderActor.delegate).createSnapshot();
1140 assertEquals(8, leaderActor.getReplicatedLog().size());
1142 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1143 //fake snapshot on index 5
1144 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1146 assertEquals(8, leaderActor.getReplicatedLog().size());
1148 //fake snapshot on index 6
1149 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1150 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1151 assertEquals(8, leaderActor.getReplicatedLog().size());
1153 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1155 assertEquals(8, leaderActor.getReplicatedLog().size());
1157 ByteString snapshotBytes = fromObject(Arrays.asList(
1158 new MockRaftActorContext.MockPayload("foo-0"),
1159 new MockRaftActorContext.MockPayload("foo-1"),
1160 new MockRaftActorContext.MockPayload("foo-2"),
1161 new MockRaftActorContext.MockPayload("foo-3"),
1162 new MockRaftActorContext.MockPayload("foo-4")));
1164 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
1165 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1167 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1169 // The commit is needed to complete the snapshot creation process
1170 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1172 // capture snapshot reply should remove the snapshotted entries only
1173 assertEquals(3, leaderActor.getReplicatedLog().size());
1174 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1176 // add another non-replicated entry
1177 leaderActor.getReplicatedLog().append(
1178 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1180 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1181 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1182 assertEquals(2, leaderActor.getReplicatedLog().size());
1183 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1190 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1191 new JavaTestKit(getSystem()) {
1193 String persistenceId = factory.generateActorId("follower-");
1194 String leaderId = factory.generateActorId("leader-");
1197 ActorRef leaderActor1 =
1198 factory.createActor(Props.create(MessageCollectorActor.class));
1200 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1201 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1202 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1204 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1206 Map<String, String> peerAddresses = new HashMap<>();
1207 peerAddresses.put(leaderId, leaderActor1.path().toString());
1209 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1210 MockRaftActor.props(persistenceId, peerAddresses,
1211 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1213 MockRaftActor followerActor = mockActorRef.underlyingActor();
1214 followerActor.getRaftActorContext().setCommitIndex(4);
1215 followerActor.getRaftActorContext().setLastApplied(4);
1216 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1218 followerActor.waitForInitializeBehaviorComplete();
1221 Follower follower = new Follower(followerActor.getRaftActorContext());
1222 followerActor.setCurrentBehavior(follower);
1223 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1225 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1226 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1227 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1229 // log has indices 0-5
1230 assertEquals(6, followerActor.getReplicatedLog().size());
1233 followerActor.getRaftActorContext().getSnapshotManager().capture(
1234 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1235 new MockRaftActorContext.MockPayload("D")), 4);
1237 verify(followerActor.delegate).createSnapshot();
1239 assertEquals(6, followerActor.getReplicatedLog().size());
1241 //fake snapshot on index 6
1242 List<ReplicatedLogEntry> entries =
1244 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1245 new MockRaftActorContext.MockPayload("foo-6"))
1247 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1248 assertEquals(7, followerActor.getReplicatedLog().size());
1250 //fake snapshot on index 7
1251 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1255 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1256 new MockRaftActorContext.MockPayload("foo-7"))
1258 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1259 assertEquals(8, followerActor.getReplicatedLog().size());
1261 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1264 ByteString snapshotBytes = fromObject(Arrays.asList(
1265 new MockRaftActorContext.MockPayload("foo-0"),
1266 new MockRaftActorContext.MockPayload("foo-1"),
1267 new MockRaftActorContext.MockPayload("foo-2"),
1268 new MockRaftActorContext.MockPayload("foo-3"),
1269 new MockRaftActorContext.MockPayload("foo-4")));
1270 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1271 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1273 // The commit is needed to complete the snapshot creation process
1274 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
1276 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1277 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1278 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1282 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1283 new MockRaftActorContext.MockPayload("foo-7"))
1285 // send an additional entry 8 with leaderCommit = 7
1286 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1288 // 7 and 8, as lastapplied is 7
1289 assertEquals(2, followerActor.getReplicatedLog().size());
1296 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1297 new JavaTestKit(getSystem()) {
1299 String persistenceId = factory.generateActorId("leader-");
1300 String follower1Id = factory.generateActorId("follower-");
1301 String follower2Id = factory.generateActorId("follower-");
1303 ActorRef followerActor1 =
1304 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1305 ActorRef followerActor2 =
1306 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1308 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1309 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1310 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1312 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1314 Map<String, String> peerAddresses = new HashMap<>();
1315 peerAddresses.put(follower1Id, followerActor1.path().toString());
1316 peerAddresses.put(follower2Id, followerActor2.path().toString());
1318 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1319 MockRaftActor.props(persistenceId, peerAddresses,
1320 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1322 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1323 leaderActor.getRaftActorContext().setCommitIndex(9);
1324 leaderActor.getRaftActorContext().setLastApplied(9);
1325 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1327 leaderActor.waitForInitializeBehaviorComplete();
1329 Leader leader = new Leader(leaderActor.getRaftActorContext());
1330 leaderActor.setCurrentBehavior(leader);
1331 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1333 // create 5 entries in the log
1334 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1335 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1337 //set the snapshot index to 4 , 0 to 4 are snapshotted
1338 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1339 //setting replicatedToAllIndex = 9, for the log to clear
1340 leader.setReplicatedToAllIndex(9);
1341 assertEquals(5, leaderActor.getReplicatedLog().size());
1342 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1344 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1345 assertEquals(5, leaderActor.getReplicatedLog().size());
1346 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1348 // set the 2nd follower nextIndex to 1 which has been snapshotted
1349 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1350 assertEquals(5, leaderActor.getReplicatedLog().size());
1351 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1353 // simulate a real snapshot
1354 leaderActor.onReceiveCommand(new SendHeartBeat());
1355 assertEquals(5, leaderActor.getReplicatedLog().size());
1356 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1357 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1358 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1361 //reply from a slow follower does not initiate a fake snapshot
1362 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1363 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1365 ByteString snapshotBytes = fromObject(Arrays.asList(
1366 new MockRaftActorContext.MockPayload("foo-0"),
1367 new MockRaftActorContext.MockPayload("foo-1"),
1368 new MockRaftActorContext.MockPayload("foo-2"),
1369 new MockRaftActorContext.MockPayload("foo-3"),
1370 new MockRaftActorContext.MockPayload("foo-4")));
1371 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1372 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1374 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1376 //reply from a slow follower after should not raise errors
1377 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1378 assertEquals(0, leaderActor.getReplicatedLog().size());
1384 private static class NonPersistentProvider implements DataPersistenceProvider {
1386 public boolean isRecoveryApplicable() {
1391 public <T> void persist(T o, Procedure<T> procedure) {
1394 } catch (Exception e) {
1395 e.printStackTrace();
1400 public void saveSnapshot(Object o) {
1405 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1410 public void deleteMessages(long sequenceNumber) {
1416 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1417 new JavaTestKit(getSystem()) {{
1418 String persistenceId = factory.generateActorId("leader-");
1419 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1420 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1421 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1422 config.setSnapshotBatchCount(5);
1424 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1426 Map<String, String> peerAddresses = new HashMap<>();
1428 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1429 MockRaftActor.props(persistenceId, peerAddresses,
1430 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1432 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1433 leaderActor.getRaftActorContext().setCommitIndex(3);
1434 leaderActor.getRaftActorContext().setLastApplied(3);
1435 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1437 leaderActor.waitForInitializeBehaviorComplete();
1438 for(int i=0;i< 4;i++) {
1439 leaderActor.getReplicatedLog()
1440 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1441 new MockRaftActorContext.MockPayload("A")));
1444 Leader leader = new Leader(leaderActor.getRaftActorContext());
1445 leaderActor.setCurrentBehavior(leader);
1446 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1448 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1449 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1451 // Now send a CaptureSnapshotReply
1452 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1454 // Trimming log in this scenario is a no-op
1455 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1456 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1457 assertEquals(-1, leader.getReplicatedToAllIndex());
1463 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1464 new JavaTestKit(getSystem()) {{
1465 String persistenceId = factory.generateActorId("leader-");
1466 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1467 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1468 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1469 config.setSnapshotBatchCount(5);
1471 DataPersistenceProvider dataPersistenceProvider = new NonPersistentProvider();
1473 Map<String, String> peerAddresses = new HashMap<>();
1475 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1476 MockRaftActor.props(persistenceId, peerAddresses,
1477 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1479 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1480 leaderActor.getRaftActorContext().setCommitIndex(3);
1481 leaderActor.getRaftActorContext().setLastApplied(3);
1482 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1483 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1485 leaderActor.waitForInitializeBehaviorComplete();
1486 Leader leader = new Leader(leaderActor.getRaftActorContext());
1487 leaderActor.setCurrentBehavior(leader);
1488 leader.setReplicatedToAllIndex(3);
1489 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1491 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1492 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1494 // Now send a CaptureSnapshotReply
1495 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1497 // Trimming log in this scenario is a no-op
1498 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1499 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1500 assertEquals(3, leader.getReplicatedToAllIndex());
1505 private ByteString fromObject(Object snapshot) throws Exception {
1506 ByteArrayOutputStream b = null;
1507 ObjectOutputStream o = null;
1509 b = new ByteArrayOutputStream();
1510 o = new ObjectOutputStream(b);
1511 o.writeObject(snapshot);
1512 byte[] snapshotBytes = b.toByteArray();
1513 return ByteString.copyFrom(snapshotBytes);