1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import com.google.protobuf.ByteString;
38 import java.io.ByteArrayInputStream;
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.io.ObjectInputStream;
42 import java.io.ObjectOutputStream;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import org.junit.After;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.opendaylight.controller.cluster.DataPersistenceProvider;
57 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
58 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
59 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
60 import org.opendaylight.controller.cluster.notifications.RoleChanged;
61 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
63 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
64 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
65 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
66 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
67 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
68 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
69 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
70 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
71 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
72 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
73 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
74 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
75 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
76 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
77 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
78 import scala.concurrent.Await;
79 import scala.concurrent.Future;
80 import scala.concurrent.duration.Duration;
81 import scala.concurrent.duration.FiniteDuration;
83 public class RaftActorTest extends AbstractActorTest {
85 private TestActorFactory factory;
89 factory = new TestActorFactory(getSystem());
93 public void tearDown() throws Exception {
95 InMemoryJournal.clear();
96 InMemorySnapshotStore.clear();
99 public static class MockRaftActor extends RaftActor {
101 private final RaftActor delegate;
102 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
103 private final List<Object> state;
104 private ActorRef roleChangeNotifier;
105 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
107 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
108 private static final long serialVersionUID = 1L;
109 private final Map<String, String> peerAddresses;
110 private final String id;
111 private final Optional<ConfigParams> config;
112 private final DataPersistenceProvider dataPersistenceProvider;
113 private final ActorRef roleChangeNotifier;
115 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
116 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
117 ActorRef roleChangeNotifier) {
118 this.peerAddresses = peerAddresses;
120 this.config = config;
121 this.dataPersistenceProvider = dataPersistenceProvider;
122 this.roleChangeNotifier = roleChangeNotifier;
126 public MockRaftActor create() throws Exception {
127 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
128 dataPersistenceProvider);
129 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
130 return mockRaftActor;
134 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
135 DataPersistenceProvider dataPersistenceProvider) {
136 super(id, peerAddresses, config);
137 state = new ArrayList<>();
138 this.delegate = mock(RaftActor.class);
139 if(dataPersistenceProvider == null){
140 setPersistence(true);
142 setPersistence(dataPersistenceProvider);
146 public void waitForRecoveryComplete() {
148 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
149 } catch (InterruptedException e) {
154 public void waitForInitializeBehaviorComplete() {
156 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
157 } catch (InterruptedException e) {
163 public void waitUntilLeader(){
164 for(int i = 0;i < 10; i++){
168 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
172 public List<Object> getState() {
176 public static Props props(final String id, final Map<String, String> peerAddresses,
177 Optional<ConfigParams> config){
178 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
181 public static Props props(final String id, final Map<String, String> peerAddresses,
182 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
183 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
186 public static Props props(final String id, final Map<String, String> peerAddresses,
187 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
188 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
191 public static Props props(final String id, final Map<String, String> peerAddresses,
192 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
193 DataPersistenceProvider dataPersistenceProvider){
194 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
198 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
199 delegate.applyState(clientActor, identifier, data);
200 LOG.info("{}: applyState called", persistenceId());
204 protected void startLogRecoveryBatch(int maxBatchSize) {
208 protected void appendRecoveredLogEntry(Payload data) {
213 protected void applyCurrentLogRecoveryBatch() {
217 protected void onRecoveryComplete() {
218 delegate.onRecoveryComplete();
219 recoveryComplete.countDown();
223 protected void initializeBehavior() {
224 super.initializeBehavior();
225 initializeBehaviorComplete.countDown();
229 protected void applyRecoverySnapshot(byte[] bytes) {
230 delegate.applyRecoverySnapshot(bytes);
232 Object data = toObject(bytes);
233 if (data instanceof List) {
234 state.addAll((List<?>) data);
236 } catch (Exception e) {
241 @Override protected void createSnapshot() {
242 LOG.info("{}: createSnapshot called", persistenceId());
243 delegate.createSnapshot();
246 @Override protected void applySnapshot(byte [] snapshot) {
247 LOG.info("{}: applySnapshot called", persistenceId());
248 delegate.applySnapshot(snapshot);
251 @Override protected void onStateChanged() {
252 delegate.onStateChanged();
256 protected Optional<ActorRef> getRoleChangeNotifier() {
257 return Optional.fromNullable(roleChangeNotifier);
260 @Override public String persistenceId() {
264 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
266 ByteArrayInputStream bis = null;
267 ObjectInputStream ois = null;
269 bis = new ByteArrayInputStream(bs);
270 ois = new ObjectInputStream(bis);
271 obj = ois.readObject();
283 public ReplicatedLog getReplicatedLog(){
284 return this.getRaftActorContext().getReplicatedLog();
290 public static class RaftActorTestKit extends JavaTestKit {
291 private final ActorRef raftActor;
293 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
296 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
297 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
302 public ActorRef getRaftActor() {
306 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
307 // Wait for a specific log message to show up
309 new JavaTestKit.EventFilter<Boolean>(logEventClass
312 protected Boolean run() {
315 }.from(raftActor.path().toString())
317 .occurrences(1).exec();
322 protected void waitUntilLeader(){
323 waitUntilLeader(raftActor);
326 public static void waitUntilLeader(ActorRef actorRef) {
327 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
328 for(int i = 0; i < 20 * 5; i++) {
329 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
331 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
332 if(resp.getLeaderActor() != null) {
335 } catch(TimeoutException e) {
336 } catch(Exception e) {
337 System.err.println("FindLeader threw ex");
342 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
345 Assert.fail("Leader not found for actorRef " + actorRef.path());
352 public void testConstruction() {
353 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
357 public void testFindLeaderWhenLeaderIsSelf(){
358 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
359 kit.waitUntilLeader();
363 public void testRaftActorRecovery() throws Exception {
364 new JavaTestKit(getSystem()) {{
365 String persistenceId = factory.generateActorId("follower-");
367 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
368 // Set the heartbeat interval high to essentially disable election otherwise the test
369 // may fail if the actor is switched to Leader and the commitIndex is set to the last
371 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
373 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
374 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
376 watch(followerActor);
378 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
379 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
380 new MockRaftActorContext.MockPayload("E"));
381 snapshotUnappliedEntries.add(entry1);
383 int lastAppliedDuringSnapshotCapture = 3;
384 int lastIndexDuringSnapshotCapture = 4;
386 // 4 messages as part of snapshot, which are applied to state
387 ByteString snapshotBytes = fromObject(Arrays.asList(
388 new MockRaftActorContext.MockPayload("A"),
389 new MockRaftActorContext.MockPayload("B"),
390 new MockRaftActorContext.MockPayload("C"),
391 new MockRaftActorContext.MockPayload("D")));
393 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
394 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
395 lastAppliedDuringSnapshotCapture, 1);
396 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
398 // add more entries after snapshot is taken
399 List<ReplicatedLogEntry> entries = new ArrayList<>();
400 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
401 new MockRaftActorContext.MockPayload("F"));
402 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
403 new MockRaftActorContext.MockPayload("G"));
404 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
405 new MockRaftActorContext.MockPayload("H"));
410 int lastAppliedToState = 5;
413 InMemoryJournal.addEntry(persistenceId, 5, entry2);
414 // 2 entries are applied to state besides the 4 entries in snapshot
415 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
416 InMemoryJournal.addEntry(persistenceId, 7, entry3);
417 InMemoryJournal.addEntry(persistenceId, 8, entry4);
420 followerActor.tell(PoisonPill.getInstance(), null);
421 expectMsgClass(duration("5 seconds"), Terminated.class);
423 unwatch(followerActor);
425 //reinstate the actor
426 TestActorRef<MockRaftActor> ref = factory.createTestActor(
427 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
428 Optional.<ConfigParams>of(config)));
430 ref.underlyingActor().waitForRecoveryComplete();
432 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
433 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
434 context.getReplicatedLog().size());
435 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
436 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
437 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
438 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
443 public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
444 new JavaTestKit(getSystem()) {{
445 String persistenceId = factory.generateActorId("leader-");
447 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
448 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
450 // Setup the persisted journal with some entries
451 ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
452 new MockRaftActorContext.MockPayload("zero"));
453 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
454 new MockRaftActorContext.MockPayload("oen"));
455 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
456 new MockRaftActorContext.MockPayload("two"));
459 InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
460 InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
461 InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
462 InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
464 int lastAppliedToState = 1;
467 //reinstate the actor
468 TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
469 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
470 Optional.<ConfigParams>of(config)));
472 leaderActor.underlyingActor().waitForRecoveryComplete();
474 RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
475 assertEquals("Journal log size", 3, context.getReplicatedLog().size());
476 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
477 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
478 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
483 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
484 * process recovery messages
490 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
491 new JavaTestKit(getSystem()) {
493 String persistenceId = factory.generateActorId("leader-");
495 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
497 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
499 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
500 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
502 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
504 // Wait for akka's recovery to complete so it doesn't interfere.
505 mockRaftActor.waitForRecoveryComplete();
507 ByteString snapshotBytes = fromObject(Arrays.asList(
508 new MockRaftActorContext.MockPayload("A"),
509 new MockRaftActorContext.MockPayload("B"),
510 new MockRaftActorContext.MockPayload("C"),
511 new MockRaftActorContext.MockPayload("D")));
513 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
514 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
516 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
518 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
520 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
522 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
524 assertEquals("add replicated log entry", 1, replicatedLog.size());
526 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
528 assertEquals("add replicated log entry", 2, replicatedLog.size());
530 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
532 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
534 // The snapshot had 4 items + we added 2 more items during the test
535 // We start removing from 5 and we should get 1 item in the replicated log
536 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
538 assertEquals("remove log entries", 1, replicatedLog.size());
540 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
542 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
543 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
545 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
551 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
552 * not process recovery messages
557 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
558 new JavaTestKit(getSystem()) {
560 String persistenceId = factory.generateActorId("leader-");
562 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
564 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
566 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
567 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
569 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
571 // Wait for akka's recovery to complete so it doesn't interfere.
572 mockRaftActor.waitForRecoveryComplete();
574 ByteString snapshotBytes = fromObject(Arrays.asList(
575 new MockRaftActorContext.MockPayload("A"),
576 new MockRaftActorContext.MockPayload("B"),
577 new MockRaftActorContext.MockPayload("C"),
578 new MockRaftActorContext.MockPayload("D")));
580 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
581 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1, 3, 1);
583 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
585 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
587 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
589 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
591 assertEquals("add replicated log entry", 0, replicatedLog.size());
593 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
595 assertEquals("add replicated log entry", 0, replicatedLog.size());
597 mockRaftActor.onReceiveRecover(new ApplyJournalEntries(1));
599 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
601 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
603 assertEquals("remove log entries", 0, replicatedLog.size());
605 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
607 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
608 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
610 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
616 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
617 new JavaTestKit(getSystem()) {
619 String persistenceId = factory.generateActorId("leader-");
621 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
623 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
625 CountDownLatch persistLatch = new CountDownLatch(1);
626 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
627 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
629 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
630 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
632 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
634 mockRaftActor.waitForInitializeBehaviorComplete();
636 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
638 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
644 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
645 new JavaTestKit(getSystem()) {
647 String persistenceId = factory.generateActorId("leader-");
649 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
651 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
653 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
655 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
656 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
658 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
660 mockRaftActor.waitForInitializeBehaviorComplete();
662 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
664 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
666 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
672 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
673 new JavaTestKit(getSystem()) {
675 String persistenceId = factory.generateActorId("leader-");
677 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
679 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
681 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
683 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
684 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
686 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
688 mockRaftActor.waitForInitializeBehaviorComplete();
690 mockRaftActor.waitUntilLeader();
692 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
694 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
696 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
702 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
703 new JavaTestKit(getSystem()) {
705 String persistenceId = factory.generateActorId("leader-");
707 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
709 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
711 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
713 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
714 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
716 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
718 mockRaftActor.waitForInitializeBehaviorComplete();
720 mockRaftActor.waitUntilLeader();
722 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
724 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
732 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
733 new JavaTestKit(getSystem()) {
735 String persistenceId = factory.generateActorId("leader-");
737 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
739 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
741 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
743 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
744 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
745 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
747 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
749 mockRaftActor.waitForInitializeBehaviorComplete();
751 ByteString snapshotBytes = fromObject(Arrays.asList(
752 new MockRaftActorContext.MockPayload("A"),
753 new MockRaftActorContext.MockPayload("B"),
754 new MockRaftActorContext.MockPayload("C"),
755 new MockRaftActorContext.MockPayload("D")));
757 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
759 raftActorContext.getSnapshotManager().capture(
760 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
761 new MockRaftActorContext.MockPayload("D")), -1);
763 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
765 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
767 verify(dataPersistenceProvider).saveSnapshot(anyObject());
774 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
775 new JavaTestKit(getSystem()) {
777 String persistenceId = factory.generateActorId("leader-");
779 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
781 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
783 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
785 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
786 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
788 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
790 mockRaftActor.waitForInitializeBehaviorComplete();
791 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
793 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
794 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
795 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
796 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
797 mockRaftActor.getReplicatedLog().append(lastEntry);
799 ByteString snapshotBytes = fromObject(Arrays.asList(
800 new MockRaftActorContext.MockPayload("A"),
801 new MockRaftActorContext.MockPayload("B"),
802 new MockRaftActorContext.MockPayload("C"),
803 new MockRaftActorContext.MockPayload("D")));
805 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
806 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
808 long replicatedToAllIndex = 1;
810 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
812 verify(mockRaftActor.delegate).createSnapshot();
814 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
816 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
818 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
820 verify(dataPersistenceProvider).deleteMessages(100);
822 assertEquals(3, mockRaftActor.getReplicatedLog().size());
823 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
825 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
826 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
827 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
829 // Index 2 will not be in the log because it was removed due to snapshotting
830 assertNull(mockRaftActor.getReplicatedLog().get(1));
831 assertNull(mockRaftActor.getReplicatedLog().get(0));
838 public void testApplyState() throws Exception {
840 new JavaTestKit(getSystem()) {
842 String persistenceId = factory.generateActorId("leader-");
844 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
846 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
848 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
850 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
851 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
853 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
855 mockRaftActor.waitForInitializeBehaviorComplete();
857 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
858 new MockRaftActorContext.MockPayload("F"));
860 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
862 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
869 public void testApplySnapshot() throws Exception {
870 new JavaTestKit(getSystem()) {
872 String persistenceId = factory.generateActorId("leader-");
874 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
876 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
878 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
880 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
881 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
883 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
885 mockRaftActor.waitForInitializeBehaviorComplete();
887 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
889 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
890 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
891 oldReplicatedLog.append(
892 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
893 mock(Payload.class)));
895 ByteString snapshotBytes = fromObject(Arrays.asList(
896 new MockRaftActorContext.MockPayload("A"),
897 new MockRaftActorContext.MockPayload("B"),
898 new MockRaftActorContext.MockPayload("C"),
899 new MockRaftActorContext.MockPayload("D")));
901 Snapshot snapshot = mock(Snapshot.class);
903 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
905 doReturn(3L).when(snapshot).getLastAppliedIndex();
907 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
909 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
911 assertTrue("The replicatedLog should have changed",
912 oldReplicatedLog != mockRaftActor.getReplicatedLog());
914 assertEquals("lastApplied should be same as in the snapshot",
915 (Long) 3L, mockRaftActor.getLastApplied());
917 assertEquals(0, mockRaftActor.getReplicatedLog().size());
924 public void testSaveSnapshotFailure() throws Exception {
925 new JavaTestKit(getSystem()) {
927 String persistenceId = factory.generateActorId("leader-");
929 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
931 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
933 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
935 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
936 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
938 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
940 mockRaftActor.waitForInitializeBehaviorComplete();
942 ByteString snapshotBytes = fromObject(Arrays.asList(
943 new MockRaftActorContext.MockPayload("A"),
944 new MockRaftActorContext.MockPayload("B"),
945 new MockRaftActorContext.MockPayload("C"),
946 new MockRaftActorContext.MockPayload("D")));
948 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
950 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
952 raftActorContext.getSnapshotManager().capture(
953 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
954 new MockRaftActorContext.MockPayload("D")), 1);
956 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
958 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
961 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
962 mockRaftActor.getReplicatedLog().getSnapshotIndex());
969 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
970 new JavaTestKit(getSystem()) {{
971 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
972 Props.create(MessageCollectorActor.class));
973 MessageCollectorActor.waitUntilReady(notifierActor);
975 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
976 long heartBeatInterval = 100;
977 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
978 config.setElectionTimeoutFactor(20);
980 String persistenceId = factory.generateActorId("notifier-");
982 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
983 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
984 new NonPersistentDataProvider()), persistenceId);
986 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
989 // check if the notifier got a role change from null to Follower
990 RoleChanged raftRoleChanged = matches.get(0);
991 assertEquals(persistenceId, raftRoleChanged.getMemberId());
992 assertNull(raftRoleChanged.getOldRole());
993 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
995 // check if the notifier got a role change from Follower to Candidate
996 raftRoleChanged = matches.get(1);
997 assertEquals(persistenceId, raftRoleChanged.getMemberId());
998 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
999 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1001 // check if the notifier got a role change from Candidate to Leader
1002 raftRoleChanged = matches.get(2);
1003 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1004 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
1005 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
1007 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
1008 notifierActor, LeaderStateChanged.class);
1010 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
1012 notifierActor.underlyingActor().clear();
1014 MockRaftActor raftActor = raftActorRef.underlyingActor();
1015 final String newLeaderId = "new-leader";
1016 Follower follower = new Follower(raftActor.getRaftActorContext()) {
1018 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
1019 leaderId = newLeaderId;
1024 raftActor.changeCurrentBehavior(follower);
1026 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1027 assertEquals(persistenceId, leaderStateChange.getMemberId());
1028 assertEquals(null, leaderStateChange.getLeaderId());
1030 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
1031 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
1032 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1034 notifierActor.underlyingActor().clear();
1036 raftActor.handleCommand("any");
1038 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
1039 assertEquals(persistenceId, leaderStateChange.getMemberId());
1040 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
1045 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
1046 new JavaTestKit(getSystem()) {{
1047 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
1048 MessageCollectorActor.waitUntilReady(notifierActor);
1050 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1051 long heartBeatInterval = 100;
1052 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
1053 config.setElectionTimeoutFactor(1);
1055 String persistenceId = factory.generateActorId("notifier-");
1057 factory.createActor(MockRaftActor.props(persistenceId,
1058 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
1060 List<RoleChanged> matches = null;
1061 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
1062 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
1063 assertNotNull(matches);
1064 if(matches.size() == 3) {
1067 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
1070 assertEquals(2, matches.size());
1072 // check if the notifier got a role change from null to Follower
1073 RoleChanged raftRoleChanged = matches.get(0);
1074 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1075 assertNull(raftRoleChanged.getOldRole());
1076 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
1078 // check if the notifier got a role change from Follower to Candidate
1079 raftRoleChanged = matches.get(1);
1080 assertEquals(persistenceId, raftRoleChanged.getMemberId());
1081 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
1082 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
1088 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
1089 new JavaTestKit(getSystem()) {
1091 String persistenceId = factory.generateActorId("leader-");
1092 String follower1Id = factory.generateActorId("follower-");
1094 ActorRef followerActor1 =
1095 factory.createActor(Props.create(MessageCollectorActor.class));
1097 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1098 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1099 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1101 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1103 Map<String, String> peerAddresses = new HashMap<>();
1104 peerAddresses.put(follower1Id, followerActor1.path().toString());
1106 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1107 MockRaftActor.props(persistenceId, peerAddresses,
1108 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1110 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1112 leaderActor.getRaftActorContext().setCommitIndex(4);
1113 leaderActor.getRaftActorContext().setLastApplied(4);
1114 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1116 leaderActor.waitForInitializeBehaviorComplete();
1118 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1120 Leader leader = new Leader(leaderActor.getRaftActorContext());
1121 leaderActor.setCurrentBehavior(leader);
1122 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1124 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1125 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
1127 assertEquals(8, leaderActor.getReplicatedLog().size());
1129 leaderActor.getRaftActorContext().getSnapshotManager()
1130 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1131 new MockRaftActorContext.MockPayload("x")), 4);
1133 verify(leaderActor.delegate).createSnapshot();
1135 assertEquals(8, leaderActor.getReplicatedLog().size());
1137 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1138 //fake snapshot on index 5
1139 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
1141 assertEquals(8, leaderActor.getReplicatedLog().size());
1143 //fake snapshot on index 6
1144 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1145 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
1146 assertEquals(8, leaderActor.getReplicatedLog().size());
1148 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1150 assertEquals(8, leaderActor.getReplicatedLog().size());
1152 ByteString snapshotBytes = fromObject(Arrays.asList(
1153 new MockRaftActorContext.MockPayload("foo-0"),
1154 new MockRaftActorContext.MockPayload("foo-1"),
1155 new MockRaftActorContext.MockPayload("foo-2"),
1156 new MockRaftActorContext.MockPayload("foo-3"),
1157 new MockRaftActorContext.MockPayload("foo-4")));
1159 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
1160 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
1162 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1164 // The commit is needed to complete the snapshot creation process
1165 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1167 // capture snapshot reply should remove the snapshotted entries only
1168 assertEquals(3, leaderActor.getReplicatedLog().size());
1169 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1171 // add another non-replicated entry
1172 leaderActor.getReplicatedLog().append(
1173 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1175 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1176 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
1177 assertEquals(2, leaderActor.getReplicatedLog().size());
1178 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1185 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1186 new JavaTestKit(getSystem()) {
1188 String persistenceId = factory.generateActorId("follower-");
1189 String leaderId = factory.generateActorId("leader-");
1192 ActorRef leaderActor1 =
1193 factory.createActor(Props.create(MessageCollectorActor.class));
1195 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1196 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1197 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1199 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1201 Map<String, String> peerAddresses = new HashMap<>();
1202 peerAddresses.put(leaderId, leaderActor1.path().toString());
1204 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1205 MockRaftActor.props(persistenceId, peerAddresses,
1206 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1208 MockRaftActor followerActor = mockActorRef.underlyingActor();
1209 followerActor.getRaftActorContext().setCommitIndex(4);
1210 followerActor.getRaftActorContext().setLastApplied(4);
1211 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1213 followerActor.waitForInitializeBehaviorComplete();
1216 Follower follower = new Follower(followerActor.getRaftActorContext());
1217 followerActor.setCurrentBehavior(follower);
1218 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1220 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1221 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1222 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1224 // log has indices 0-5
1225 assertEquals(6, followerActor.getReplicatedLog().size());
1228 followerActor.getRaftActorContext().getSnapshotManager().capture(
1229 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
1230 new MockRaftActorContext.MockPayload("D")), 4);
1232 verify(followerActor.delegate).createSnapshot();
1234 assertEquals(6, followerActor.getReplicatedLog().size());
1236 //fake snapshot on index 6
1237 List<ReplicatedLogEntry> entries =
1239 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1240 new MockRaftActorContext.MockPayload("foo-6"))
1242 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
1243 assertEquals(7, followerActor.getReplicatedLog().size());
1245 //fake snapshot on index 7
1246 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1250 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1251 new MockRaftActorContext.MockPayload("foo-7"))
1253 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
1254 assertEquals(8, followerActor.getReplicatedLog().size());
1256 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1259 ByteString snapshotBytes = fromObject(Arrays.asList(
1260 new MockRaftActorContext.MockPayload("foo-0"),
1261 new MockRaftActorContext.MockPayload("foo-1"),
1262 new MockRaftActorContext.MockPayload("foo-2"),
1263 new MockRaftActorContext.MockPayload("foo-3"),
1264 new MockRaftActorContext.MockPayload("foo-4")));
1265 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1266 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
1268 // The commit is needed to complete the snapshot creation process
1269 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
1271 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
1272 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1273 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1277 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1278 new MockRaftActorContext.MockPayload("foo-7"))
1280 // send an additional entry 8 with leaderCommit = 7
1281 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
1283 // 7 and 8, as lastapplied is 7
1284 assertEquals(2, followerActor.getReplicatedLog().size());
1291 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1292 new JavaTestKit(getSystem()) {
1294 String persistenceId = factory.generateActorId("leader-");
1295 String follower1Id = factory.generateActorId("follower-");
1296 String follower2Id = factory.generateActorId("follower-");
1298 ActorRef followerActor1 =
1299 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
1300 ActorRef followerActor2 =
1301 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
1303 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1304 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1305 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1307 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1309 Map<String, String> peerAddresses = new HashMap<>();
1310 peerAddresses.put(follower1Id, followerActor1.path().toString());
1311 peerAddresses.put(follower2Id, followerActor2.path().toString());
1313 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1314 MockRaftActor.props(persistenceId, peerAddresses,
1315 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1317 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1318 leaderActor.getRaftActorContext().setCommitIndex(9);
1319 leaderActor.getRaftActorContext().setLastApplied(9);
1320 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1322 leaderActor.waitForInitializeBehaviorComplete();
1324 Leader leader = new Leader(leaderActor.getRaftActorContext());
1325 leaderActor.setCurrentBehavior(leader);
1326 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1328 // create 5 entries in the log
1329 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1330 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1332 //set the snapshot index to 4 , 0 to 4 are snapshotted
1333 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1334 //setting replicatedToAllIndex = 9, for the log to clear
1335 leader.setReplicatedToAllIndex(9);
1336 assertEquals(5, leaderActor.getReplicatedLog().size());
1337 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1339 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
1340 assertEquals(5, leaderActor.getReplicatedLog().size());
1341 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1343 // set the 2nd follower nextIndex to 1 which has been snapshotted
1344 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
1345 assertEquals(5, leaderActor.getReplicatedLog().size());
1346 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1348 // simulate a real snapshot
1349 leaderActor.onReceiveCommand(new SendHeartBeat());
1350 assertEquals(5, leaderActor.getReplicatedLog().size());
1351 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1352 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
1353 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1356 //reply from a slow follower does not initiate a fake snapshot
1357 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
1358 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1360 ByteString snapshotBytes = fromObject(Arrays.asList(
1361 new MockRaftActorContext.MockPayload("foo-0"),
1362 new MockRaftActorContext.MockPayload("foo-1"),
1363 new MockRaftActorContext.MockPayload("foo-2"),
1364 new MockRaftActorContext.MockPayload("foo-3"),
1365 new MockRaftActorContext.MockPayload("foo-4")));
1366 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1367 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1369 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1371 //reply from a slow follower after should not raise errors
1372 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1373 assertEquals(0, leaderActor.getReplicatedLog().size());
1379 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1380 new JavaTestKit(getSystem()) {{
1381 String persistenceId = factory.generateActorId("leader-");
1382 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1383 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1384 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1385 config.setSnapshotBatchCount(5);
1387 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1389 Map<String, String> peerAddresses = new HashMap<>();
1391 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1392 MockRaftActor.props(persistenceId, peerAddresses,
1393 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1395 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1396 leaderActor.getRaftActorContext().setCommitIndex(3);
1397 leaderActor.getRaftActorContext().setLastApplied(3);
1398 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1400 leaderActor.waitForInitializeBehaviorComplete();
1401 for(int i=0;i< 4;i++) {
1402 leaderActor.getReplicatedLog()
1403 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1404 new MockRaftActorContext.MockPayload("A")));
1407 Leader leader = new Leader(leaderActor.getRaftActorContext());
1408 leaderActor.setCurrentBehavior(leader);
1409 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1411 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1412 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1414 // Now send a CaptureSnapshotReply
1415 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1417 // Trimming log in this scenario is a no-op
1418 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1419 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1420 assertEquals(-1, leader.getReplicatedToAllIndex());
1426 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1427 new JavaTestKit(getSystem()) {{
1428 String persistenceId = factory.generateActorId("leader-");
1429 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1430 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1431 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1432 config.setSnapshotBatchCount(5);
1434 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1436 Map<String, String> peerAddresses = new HashMap<>();
1438 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1439 MockRaftActor.props(persistenceId, peerAddresses,
1440 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1442 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1443 leaderActor.getRaftActorContext().setCommitIndex(3);
1444 leaderActor.getRaftActorContext().setLastApplied(3);
1445 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1446 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1448 leaderActor.waitForInitializeBehaviorComplete();
1449 Leader leader = new Leader(leaderActor.getRaftActorContext());
1450 leaderActor.setCurrentBehavior(leader);
1451 leader.setReplicatedToAllIndex(3);
1452 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1454 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1455 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1457 // Now send a CaptureSnapshotReply
1458 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1460 // Trimming log in this scenario is a no-op
1461 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1462 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1463 assertEquals(3, leader.getReplicatedToAllIndex());
1468 private ByteString fromObject(Object snapshot) throws Exception {
1469 ByteArrayOutputStream b = null;
1470 ObjectOutputStream o = null;
1472 b = new ByteArrayOutputStream();
1473 o = new ObjectOutputStream(b);
1474 o.writeObject(snapshot);
1475 byte[] snapshotBytes = b.toByteArray();
1476 return ByteString.copyFrom(snapshotBytes);