1 package org.opendaylight.controller.cluster.raft;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import akka.actor.Props;
7 import akka.actor.Terminated;
8 import akka.japi.Creator;
9 import akka.japi.Procedure;
10 import akka.pattern.Patterns;
11 import akka.persistence.RecoveryCompleted;
12 import akka.persistence.SaveSnapshotFailure;
13 import akka.persistence.SaveSnapshotSuccess;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotOffer;
16 import akka.persistence.SnapshotSelectionCriteria;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.google.protobuf.ByteString;
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import org.junit.After;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
43 import org.opendaylight.controller.cluster.notifications.RoleChanged;
44 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
45 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
46 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
47 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
50 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
51 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
52 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
53 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
57 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
58 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
59 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
60 import scala.concurrent.Await;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
65 import static org.junit.Assert.assertEquals;
66 import static org.junit.Assert.assertFalse;
67 import static org.junit.Assert.assertNotEquals;
68 import static org.junit.Assert.assertNotNull;
69 import static org.junit.Assert.assertNull;
70 import static org.junit.Assert.assertTrue;
71 import static org.mockito.Matchers.any;
72 import static org.mockito.Matchers.anyObject;
73 import static org.mockito.Matchers.eq;
74 import static org.mockito.Mockito.doReturn;
75 import static org.mockito.Mockito.mock;
76 import static org.mockito.Mockito.times;
77 import static org.mockito.Mockito.verify;
79 public class RaftActorTest extends AbstractActorTest {
83 public void tearDown() {
84 MockAkkaJournal.clearJournal();
85 MockSnapshotStore.setMockSnapshot(null);
88 public static class MockRaftActor extends RaftActor {
90 private final DataPersistenceProvider dataPersistenceProvider;
91 private final RaftActor delegate;
92 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
93 private final List<Object> state;
94 private ActorRef roleChangeNotifier;
95 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
97 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
98 private static final long serialVersionUID = 1L;
99 private final Map<String, String> peerAddresses;
100 private final String id;
101 private final Optional<ConfigParams> config;
102 private final DataPersistenceProvider dataPersistenceProvider;
103 private final ActorRef roleChangeNotifier;
105 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
106 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
107 ActorRef roleChangeNotifier) {
108 this.peerAddresses = peerAddresses;
110 this.config = config;
111 this.dataPersistenceProvider = dataPersistenceProvider;
112 this.roleChangeNotifier = roleChangeNotifier;
116 public MockRaftActor create() throws Exception {
117 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
118 dataPersistenceProvider);
119 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
120 return mockRaftActor;
124 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
125 DataPersistenceProvider dataPersistenceProvider) {
126 super(id, peerAddresses, config);
127 state = new ArrayList<>();
128 this.delegate = mock(RaftActor.class);
129 if(dataPersistenceProvider == null){
130 this.dataPersistenceProvider = new PersistentDataProvider();
132 this.dataPersistenceProvider = dataPersistenceProvider;
136 public void waitForRecoveryComplete() {
138 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
139 } catch (InterruptedException e) {
144 public void waitForInitializeBehaviorComplete() {
146 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
147 } catch (InterruptedException e) {
152 public List<Object> getState() {
156 public static Props props(final String id, final Map<String, String> peerAddresses,
157 Optional<ConfigParams> config){
158 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
161 public static Props props(final String id, final Map<String, String> peerAddresses,
162 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
163 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
166 public static Props props(final String id, final Map<String, String> peerAddresses,
167 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
168 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
171 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
172 delegate.applyState(clientActor, identifier, data);
173 LOG.info("applyState called");
177 protected void startLogRecoveryBatch(int maxBatchSize) {
181 protected void appendRecoveredLogEntry(Payload data) {
186 protected void applyCurrentLogRecoveryBatch() {
190 protected void onRecoveryComplete() {
191 delegate.onRecoveryComplete();
192 recoveryComplete.countDown();
196 protected void initializeBehavior() {
197 super.initializeBehavior();
198 initializeBehaviorComplete.countDown();
202 protected void applyRecoverySnapshot(byte[] bytes) {
203 delegate.applyRecoverySnapshot(bytes);
205 Object data = toObject(bytes);
206 if (data instanceof List) {
207 state.addAll((List<?>) data);
209 } catch (Exception e) {
214 @Override protected void createSnapshot() {
215 delegate.createSnapshot();
218 @Override protected void applySnapshot(byte [] snapshot) {
219 delegate.applySnapshot(snapshot);
222 @Override protected void onStateChanged() {
223 delegate.onStateChanged();
227 protected DataPersistenceProvider persistence() {
228 return this.dataPersistenceProvider;
232 protected Optional<ActorRef> getRoleChangeNotifier() {
233 return Optional.fromNullable(roleChangeNotifier);
236 @Override public String persistenceId() {
240 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
242 ByteArrayInputStream bis = null;
243 ObjectInputStream ois = null;
245 bis = new ByteArrayInputStream(bs);
246 ois = new ObjectInputStream(bis);
247 obj = ois.readObject();
259 public ReplicatedLog getReplicatedLog(){
260 return this.getRaftActorContext().getReplicatedLog();
266 private static class RaftActorTestKit extends JavaTestKit {
267 private final ActorRef raftActor;
269 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
272 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
273 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
278 public ActorRef getRaftActor() {
282 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
283 // Wait for a specific log message to show up
285 new JavaTestKit.EventFilter<Boolean>(logEventClass
288 protected Boolean run() {
291 }.from(raftActor.path().toString())
293 .occurrences(1).exec();
298 protected void waitUntilLeader(){
299 waitUntilLeader(raftActor);
302 protected void waitUntilLeader(ActorRef actorRef) {
303 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
304 for(int i = 0; i < 20 * 5; i++) {
305 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
307 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
308 if(resp.getLeaderActor() != null) {
311 } catch(TimeoutException e) {
312 } catch(Exception e) {
313 System.err.println("FindLeader threw ex");
318 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
321 Assert.fail("Leader not found for actorRef " + actorRef.path());
328 public void testConstruction() {
329 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
333 public void testFindLeaderWhenLeaderIsSelf(){
334 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
335 kit.waitUntilLeader();
339 public void testRaftActorRecovery() throws Exception {
340 new JavaTestKit(getSystem()) {{
341 String persistenceId = "follower10";
343 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
344 // Set the heartbeat interval high to essentially disable election otherwise the test
345 // may fail if the actor is switched to Leader and the commitIndex is set to the last
347 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
349 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
350 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
352 watch(followerActor);
354 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
355 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
356 new MockRaftActorContext.MockPayload("E"));
357 snapshotUnappliedEntries.add(entry1);
359 int lastAppliedDuringSnapshotCapture = 3;
360 int lastIndexDuringSnapshotCapture = 4;
362 // 4 messages as part of snapshot, which are applied to state
363 ByteString snapshotBytes = fromObject(Arrays.asList(
364 new MockRaftActorContext.MockPayload("A"),
365 new MockRaftActorContext.MockPayload("B"),
366 new MockRaftActorContext.MockPayload("C"),
367 new MockRaftActorContext.MockPayload("D")));
369 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
370 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
371 lastAppliedDuringSnapshotCapture, 1);
372 MockSnapshotStore.setMockSnapshot(snapshot);
373 MockSnapshotStore.setPersistenceId(persistenceId);
375 // add more entries after snapshot is taken
376 List<ReplicatedLogEntry> entries = new ArrayList<>();
377 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
378 new MockRaftActorContext.MockPayload("F"));
379 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
380 new MockRaftActorContext.MockPayload("G"));
381 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
382 new MockRaftActorContext.MockPayload("H"));
387 int lastAppliedToState = 5;
390 MockAkkaJournal.addToJournal(5, entry2);
391 // 2 entries are applied to state besides the 4 entries in snapshot
392 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
393 MockAkkaJournal.addToJournal(7, entry3);
394 MockAkkaJournal.addToJournal(8, entry4);
397 followerActor.tell(PoisonPill.getInstance(), null);
398 expectMsgClass(duration("5 seconds"), Terminated.class);
400 unwatch(followerActor);
402 //reinstate the actor
403 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
404 MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
405 Optional.<ConfigParams>of(config)));
407 ref.underlyingActor().waitForRecoveryComplete();
409 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
410 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
411 context.getReplicatedLog().size());
412 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
413 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
414 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
415 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
420 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
421 * process recovery messages
427 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
428 new JavaTestKit(getSystem()) {
430 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
432 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
434 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
436 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
437 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
439 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
441 // Wait for akka's recovery to complete so it doesn't interfere.
442 mockRaftActor.waitForRecoveryComplete();
444 ByteString snapshotBytes = fromObject(Arrays.asList(
445 new MockRaftActorContext.MockPayload("A"),
446 new MockRaftActorContext.MockPayload("B"),
447 new MockRaftActorContext.MockPayload("C"),
448 new MockRaftActorContext.MockPayload("D")));
450 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
451 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
453 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
455 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
457 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
459 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
461 assertEquals("add replicated log entry", 1, replicatedLog.size());
463 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
465 assertEquals("add replicated log entry", 2, replicatedLog.size());
467 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
469 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
471 // The snapshot had 4 items + we added 2 more items during the test
472 // We start removing from 5 and we should get 1 item in the replicated log
473 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
475 assertEquals("remove log entries", 1, replicatedLog.size());
477 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
479 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
480 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
482 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
484 mockActorRef.tell(PoisonPill.getInstance(), getRef());
490 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
491 * not process recovery messages
496 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
497 new JavaTestKit(getSystem()) {
499 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
501 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
503 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
505 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
506 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
508 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
510 // Wait for akka's recovery to complete so it doesn't interfere.
511 mockRaftActor.waitForRecoveryComplete();
513 ByteString snapshotBytes = fromObject(Arrays.asList(
514 new MockRaftActorContext.MockPayload("A"),
515 new MockRaftActorContext.MockPayload("B"),
516 new MockRaftActorContext.MockPayload("C"),
517 new MockRaftActorContext.MockPayload("D")));
519 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
520 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
522 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
524 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
526 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
528 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
530 assertEquals("add replicated log entry", 0, replicatedLog.size());
532 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
534 assertEquals("add replicated log entry", 0, replicatedLog.size());
536 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
538 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
540 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
542 assertEquals("remove log entries", 0, replicatedLog.size());
544 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
546 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
547 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
549 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
551 mockActorRef.tell(PoisonPill.getInstance(), getRef());
557 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
558 new JavaTestKit(getSystem()) {
560 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
562 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
564 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
566 CountDownLatch persistLatch = new CountDownLatch(1);
567 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
568 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
570 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
571 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
573 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
575 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
577 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
579 mockActorRef.tell(PoisonPill.getInstance(), getRef());
586 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
587 new JavaTestKit(getSystem()) {
589 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
591 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
593 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
595 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
597 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
598 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
600 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
602 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
604 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
606 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
608 mockActorRef.tell(PoisonPill.getInstance(), getRef());
615 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
616 new JavaTestKit(getSystem()) {
618 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
620 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
622 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
624 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
626 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
627 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
629 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
631 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
633 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
635 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
637 mockActorRef.tell(PoisonPill.getInstance(), getRef());
644 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
645 new JavaTestKit(getSystem()) {
647 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
649 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
651 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
653 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
655 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
656 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
658 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
660 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
662 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
664 mockActorRef.tell(PoisonPill.getInstance(), getRef());
671 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
672 new JavaTestKit(getSystem()) {
674 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
676 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
678 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
680 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
682 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
683 MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
684 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
686 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
688 ByteString snapshotBytes = fromObject(Arrays.asList(
689 new MockRaftActorContext.MockPayload("A"),
690 new MockRaftActorContext.MockPayload("B"),
691 new MockRaftActorContext.MockPayload("C"),
692 new MockRaftActorContext.MockPayload("D")));
694 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
696 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
698 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
700 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
702 verify(dataPersistenceProvider).saveSnapshot(anyObject());
704 mockActorRef.tell(PoisonPill.getInstance(), getRef());
711 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
712 new JavaTestKit(getSystem()) {
714 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
716 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
718 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
720 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
722 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
723 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
725 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
727 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
728 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
729 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
730 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
731 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
733 ByteString snapshotBytes = fromObject(Arrays.asList(
734 new MockRaftActorContext.MockPayload("A"),
735 new MockRaftActorContext.MockPayload("B"),
736 new MockRaftActorContext.MockPayload("C"),
737 new MockRaftActorContext.MockPayload("D")));
739 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
740 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
742 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
744 verify(mockRaftActor.delegate).createSnapshot();
746 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
748 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
750 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
752 verify(dataPersistenceProvider).deleteMessages(100);
754 assertEquals(2, mockRaftActor.getReplicatedLog().size());
756 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
757 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
759 // Index 2 will not be in the log because it was removed due to snapshotting
760 assertNull(mockRaftActor.getReplicatedLog().get(2));
762 mockActorRef.tell(PoisonPill.getInstance(), getRef());
769 public void testApplyState() throws Exception {
771 new JavaTestKit(getSystem()) {
773 String persistenceId = "testApplyState";
775 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
777 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
779 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
781 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
782 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
784 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
786 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
787 new MockRaftActorContext.MockPayload("F"));
789 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
791 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
793 mockActorRef.tell(PoisonPill.getInstance(), getRef());
800 public void testApplySnapshot() throws Exception {
801 new JavaTestKit(getSystem()) {
803 String persistenceId = "testApplySnapshot";
805 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
807 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
809 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
811 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
812 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
814 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
816 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
818 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
819 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
820 oldReplicatedLog.append(
821 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
822 mock(Payload.class)));
824 ByteString snapshotBytes = fromObject(Arrays.asList(
825 new MockRaftActorContext.MockPayload("A"),
826 new MockRaftActorContext.MockPayload("B"),
827 new MockRaftActorContext.MockPayload("C"),
828 new MockRaftActorContext.MockPayload("D")));
830 Snapshot snapshot = mock(Snapshot.class);
832 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
834 doReturn(3L).when(snapshot).getLastAppliedIndex();
836 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
838 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
840 assertTrue("The replicatedLog should have changed",
841 oldReplicatedLog != mockRaftActor.getReplicatedLog());
843 assertEquals("lastApplied should be same as in the snapshot",
844 (Long) 3L, mockRaftActor.getLastApplied());
846 assertEquals(0, mockRaftActor.getReplicatedLog().size());
848 mockActorRef.tell(PoisonPill.getInstance(), getRef());
855 public void testSaveSnapshotFailure() throws Exception {
856 new JavaTestKit(getSystem()) {
858 String persistenceId = "testSaveSnapshotFailure";
860 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
862 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
864 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
866 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
867 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
869 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
871 ByteString snapshotBytes = fromObject(Arrays.asList(
872 new MockRaftActorContext.MockPayload("A"),
873 new MockRaftActorContext.MockPayload("B"),
874 new MockRaftActorContext.MockPayload("C"),
875 new MockRaftActorContext.MockPayload("D")));
877 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
879 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
881 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
883 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
885 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
888 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
889 mockRaftActor.getReplicatedLog().getSnapshotIndex());
891 mockActorRef.tell(PoisonPill.getInstance(), getRef());
898 public void testRaftRoleChangeNotifier() throws Exception {
899 new JavaTestKit(getSystem()) {{
900 ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
901 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
902 String id = "testRaftRoleChangeNotifier";
904 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
905 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
907 // sleeping for a minimum of 2 seconds, if it spans more its fine.
908 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
910 List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
911 assertNotNull(matches);
912 assertEquals(3, matches.size());
914 // check if the notifier got a role change from null to Follower
915 RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
916 assertEquals(id, raftRoleChanged.getMemberId());
917 assertNull(raftRoleChanged.getOldRole());
918 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
920 // check if the notifier got a role change from Follower to Candidate
921 raftRoleChanged = (RoleChanged) matches.get(1);
922 assertEquals(id, raftRoleChanged.getMemberId());
923 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
924 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
926 // check if the notifier got a role change from Candidate to Leader
927 raftRoleChanged = (RoleChanged) matches.get(2);
928 assertEquals(id, raftRoleChanged.getMemberId());
929 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
930 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
935 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
936 new JavaTestKit(getSystem()) {
938 String persistenceId = "leader1";
940 ActorRef followerActor1 =
941 getSystem().actorOf(Props.create(MessageCollectorActor.class));
943 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
947 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
949 Map<String, String> peerAddresses = new HashMap<>();
950 peerAddresses.put("follower-1", followerActor1.path().toString());
952 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
953 MockRaftActor.props(persistenceId, peerAddresses,
954 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
956 MockRaftActor leaderActor = mockActorRef.underlyingActor();
957 leaderActor.getRaftActorContext().setCommitIndex(4);
958 leaderActor.getRaftActorContext().setLastApplied(4);
959 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
961 leaderActor.waitForInitializeBehaviorComplete();
963 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
965 Leader leader = new Leader(leaderActor.getRaftActorContext());
966 leaderActor.setCurrentBehavior(leader);
967 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
969 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
970 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
972 assertEquals(8, leaderActor.getReplicatedLog().size());
974 leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
975 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
976 verify(leaderActor.delegate).createSnapshot();
978 assertEquals(8, leaderActor.getReplicatedLog().size());
980 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
981 //fake snapshot on index 5
982 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
984 assertEquals(8, leaderActor.getReplicatedLog().size());
986 //fake snapshot on index 6
987 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
988 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
989 assertEquals(8, leaderActor.getReplicatedLog().size());
991 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
993 assertEquals(8, leaderActor.getReplicatedLog().size());
995 ByteString snapshotBytes = fromObject(Arrays.asList(
996 new MockRaftActorContext.MockPayload("foo-0"),
997 new MockRaftActorContext.MockPayload("foo-1"),
998 new MockRaftActorContext.MockPayload("foo-2"),
999 new MockRaftActorContext.MockPayload("foo-3"),
1000 new MockRaftActorContext.MockPayload("foo-4")));
1001 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1002 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1004 // capture snapshot reply should remove the snapshotted entries only
1005 assertEquals(3, leaderActor.getReplicatedLog().size());
1006 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1008 // add another non-replicated entry
1009 leaderActor.getReplicatedLog().append(
1010 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1012 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1013 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
1014 assertEquals(2, leaderActor.getReplicatedLog().size());
1015 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1017 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1024 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1025 new JavaTestKit(getSystem()) {
1027 String persistenceId = "follower1";
1029 ActorRef leaderActor1 =
1030 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1032 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1033 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1034 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1036 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1038 Map<String, String> peerAddresses = new HashMap<>();
1039 peerAddresses.put("leader", leaderActor1.path().toString());
1041 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1042 MockRaftActor.props(persistenceId, peerAddresses,
1043 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1045 MockRaftActor followerActor = mockActorRef.underlyingActor();
1046 followerActor.getRaftActorContext().setCommitIndex(4);
1047 followerActor.getRaftActorContext().setLastApplied(4);
1048 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1050 followerActor.waitForInitializeBehaviorComplete();
1052 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1053 Follower follower = new Follower(followerActor.getRaftActorContext());
1054 followerActor.setCurrentBehavior(follower);
1055 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1057 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1058 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1060 // log as indices 0-5
1061 assertEquals(6, followerActor.getReplicatedLog().size());
1064 followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
1065 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1066 verify(followerActor.delegate).createSnapshot();
1068 assertEquals(6, followerActor.getReplicatedLog().size());
1070 //fake snapshot on index 6
1071 List<ReplicatedLogEntry> entries =
1073 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1074 new MockRaftActorContext.MockPayload("foo-6"))
1076 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
1077 assertEquals(7, followerActor.getReplicatedLog().size());
1079 //fake snapshot on index 7
1080 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1084 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1085 new MockRaftActorContext.MockPayload("foo-7"))
1087 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
1088 assertEquals(8, followerActor.getReplicatedLog().size());
1090 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1093 ByteString snapshotBytes = fromObject(Arrays.asList(
1094 new MockRaftActorContext.MockPayload("foo-0"),
1095 new MockRaftActorContext.MockPayload("foo-1"),
1096 new MockRaftActorContext.MockPayload("foo-2"),
1097 new MockRaftActorContext.MockPayload("foo-3"),
1098 new MockRaftActorContext.MockPayload("foo-4")));
1099 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1100 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1102 // capture snapshot reply should remove the snapshotted entries only
1103 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1104 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1108 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1109 new MockRaftActorContext.MockPayload("foo-7"))
1111 // send an additional entry 8 with leaderCommit = 7
1112 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
1114 // 7 and 8, as lastapplied is 7
1115 assertEquals(2, followerActor.getReplicatedLog().size());
1117 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1124 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1125 new JavaTestKit(getSystem()) {
1127 String persistenceId = "leader1";
1129 ActorRef followerActor1 =
1130 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1131 ActorRef followerActor2 =
1132 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1134 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1135 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1136 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1138 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1140 Map<String, String> peerAddresses = new HashMap<>();
1141 peerAddresses.put("follower-1", followerActor1.path().toString());
1142 peerAddresses.put("follower-2", followerActor2.path().toString());
1144 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1145 MockRaftActor.props(persistenceId, peerAddresses,
1146 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1148 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1149 leaderActor.getRaftActorContext().setCommitIndex(9);
1150 leaderActor.getRaftActorContext().setLastApplied(9);
1151 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1153 leaderActor.waitForInitializeBehaviorComplete();
1155 Leader leader = new Leader(leaderActor.getRaftActorContext());
1156 leaderActor.setCurrentBehavior(leader);
1157 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1159 // create 5 entries in the log
1160 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1161 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1162 //set the snapshot index to 4 , 0 to 4 are snapshotted
1163 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1164 assertEquals(5, leaderActor.getReplicatedLog().size());
1166 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
1167 assertEquals(5, leaderActor.getReplicatedLog().size());
1169 // set the 2nd follower nextIndex to 1 which has been snapshotted
1170 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
1171 assertEquals(5, leaderActor.getReplicatedLog().size());
1173 // simulate a real snapshot
1174 leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
1175 assertEquals(5, leaderActor.getReplicatedLog().size());
1176 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1178 //reply from a slow follower does not initiate a fake snapshot
1179 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
1180 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1182 ByteString snapshotBytes = fromObject(Arrays.asList(
1183 new MockRaftActorContext.MockPayload("foo-0"),
1184 new MockRaftActorContext.MockPayload("foo-1"),
1185 new MockRaftActorContext.MockPayload("foo-2"),
1186 new MockRaftActorContext.MockPayload("foo-3"),
1187 new MockRaftActorContext.MockPayload("foo-4")));
1188 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1189 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1191 assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
1193 //reply from a slow follower after should not raise errors
1194 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
1195 assertEquals(0, leaderActor.getReplicatedLog().size());
1197 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1205 private ByteString fromObject(Object snapshot) throws Exception {
1206 ByteArrayOutputStream b = null;
1207 ObjectOutputStream o = null;
1209 b = new ByteArrayOutputStream();
1210 o = new ObjectOutputStream(b);
1211 o.writeObject(snapshot);
1212 byte[] snapshotBytes = b.toByteArray();
1213 return ByteString.copyFrom(snapshotBytes);