1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotEquals;
6 import static org.junit.Assert.assertNotNull;
7 import static org.junit.Assert.assertNull;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Matchers.any;
10 import static org.mockito.Matchers.anyObject;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSystem;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Terminated;
21 import akka.japi.Creator;
22 import akka.japi.Procedure;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotMetadata;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.testkit.JavaTestKit;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import com.google.protobuf.ByteString;
37 import java.io.ByteArrayInputStream;
38 import java.io.ByteArrayOutputStream;
39 import java.io.IOException;
40 import java.io.ObjectInputStream;
41 import java.io.ObjectOutputStream;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import org.junit.After;
52 import org.junit.Assert;
53 import org.junit.Test;
54 import org.opendaylight.controller.cluster.DataPersistenceProvider;
55 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
56 import org.opendaylight.controller.cluster.notifications.RoleChanged;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
61 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
62 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
63 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
64 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
65 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
66 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
67 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
69 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
70 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
71 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
72 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
73 import scala.concurrent.Await;
74 import scala.concurrent.Future;
75 import scala.concurrent.duration.Duration;
76 import scala.concurrent.duration.FiniteDuration;
78 public class RaftActorTest extends AbstractActorTest {
82 public void tearDown() {
83 MockAkkaJournal.clearJournal();
84 MockSnapshotStore.setMockSnapshot(null);
87 public static class MockRaftActor extends RaftActor {
89 private final DataPersistenceProvider dataPersistenceProvider;
90 private final RaftActor delegate;
91 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
92 private final List<Object> state;
93 private ActorRef roleChangeNotifier;
94 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
96 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
97 private static final long serialVersionUID = 1L;
98 private final Map<String, String> peerAddresses;
99 private final String id;
100 private final Optional<ConfigParams> config;
101 private final DataPersistenceProvider dataPersistenceProvider;
102 private final ActorRef roleChangeNotifier;
104 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
105 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
106 ActorRef roleChangeNotifier) {
107 this.peerAddresses = peerAddresses;
109 this.config = config;
110 this.dataPersistenceProvider = dataPersistenceProvider;
111 this.roleChangeNotifier = roleChangeNotifier;
115 public MockRaftActor create() throws Exception {
116 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
117 dataPersistenceProvider);
118 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
119 return mockRaftActor;
123 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
124 DataPersistenceProvider dataPersistenceProvider) {
125 super(id, peerAddresses, config);
126 state = new ArrayList<>();
127 this.delegate = mock(RaftActor.class);
128 if(dataPersistenceProvider == null){
129 this.dataPersistenceProvider = new PersistentDataProvider();
131 this.dataPersistenceProvider = dataPersistenceProvider;
135 public void waitForRecoveryComplete() {
137 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
138 } catch (InterruptedException e) {
143 public void waitForInitializeBehaviorComplete() {
145 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
146 } catch (InterruptedException e) {
151 public List<Object> getState() {
155 public static Props props(final String id, final Map<String, String> peerAddresses,
156 Optional<ConfigParams> config){
157 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
160 public static Props props(final String id, final Map<String, String> peerAddresses,
161 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
162 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
165 public static Props props(final String id, final Map<String, String> peerAddresses,
166 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
167 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
170 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
171 delegate.applyState(clientActor, identifier, data);
172 LOG.info("applyState called");
176 protected void startLogRecoveryBatch(int maxBatchSize) {
180 protected void appendRecoveredLogEntry(Payload data) {
185 protected void applyCurrentLogRecoveryBatch() {
189 protected void onRecoveryComplete() {
190 delegate.onRecoveryComplete();
191 recoveryComplete.countDown();
195 protected void initializeBehavior() {
196 super.initializeBehavior();
197 initializeBehaviorComplete.countDown();
201 protected void applyRecoverySnapshot(byte[] bytes) {
202 delegate.applyRecoverySnapshot(bytes);
204 Object data = toObject(bytes);
205 if (data instanceof List) {
206 state.addAll((List<?>) data);
208 } catch (Exception e) {
213 @Override protected void createSnapshot() {
214 delegate.createSnapshot();
217 @Override protected void applySnapshot(byte [] snapshot) {
218 delegate.applySnapshot(snapshot);
221 @Override protected void onStateChanged() {
222 delegate.onStateChanged();
226 protected DataPersistenceProvider persistence() {
227 return this.dataPersistenceProvider;
231 protected Optional<ActorRef> getRoleChangeNotifier() {
232 return Optional.fromNullable(roleChangeNotifier);
235 @Override public String persistenceId() {
239 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
241 ByteArrayInputStream bis = null;
242 ObjectInputStream ois = null;
244 bis = new ByteArrayInputStream(bs);
245 ois = new ObjectInputStream(bis);
246 obj = ois.readObject();
258 public ReplicatedLog getReplicatedLog(){
259 return this.getRaftActorContext().getReplicatedLog();
265 private static class RaftActorTestKit extends JavaTestKit {
266 private final ActorRef raftActor;
268 public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
271 raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName,
272 Collections.<String,String>emptyMap(), Optional.<ConfigParams>absent()), actorName);
277 public ActorRef getRaftActor() {
281 public boolean waitForLogMessage(final Class<?> logEventClass, String message){
282 // Wait for a specific log message to show up
284 new JavaTestKit.EventFilter<Boolean>(logEventClass
287 protected Boolean run() {
290 }.from(raftActor.path().toString())
292 .occurrences(1).exec();
297 protected void waitUntilLeader(){
298 waitUntilLeader(raftActor);
301 protected void waitUntilLeader(ActorRef actorRef) {
302 FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
303 for(int i = 0; i < 20 * 5; i++) {
304 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
306 FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
307 if(resp.getLeaderActor() != null) {
310 } catch(TimeoutException e) {
311 } catch(Exception e) {
312 System.err.println("FindLeader threw ex");
317 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
320 Assert.fail("Leader not found for actorRef " + actorRef.path());
327 public void testConstruction() {
328 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
332 public void testFindLeaderWhenLeaderIsSelf(){
333 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
334 kit.waitUntilLeader();
338 public void testRaftActorRecovery() throws Exception {
339 new JavaTestKit(getSystem()) {{
340 String persistenceId = "follower10";
342 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
343 // Set the heartbeat interval high to essentially disable election otherwise the test
344 // may fail if the actor is switched to Leader and the commitIndex is set to the last
346 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
348 ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId,
349 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
351 watch(followerActor);
353 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
354 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
355 new MockRaftActorContext.MockPayload("E"));
356 snapshotUnappliedEntries.add(entry1);
358 int lastAppliedDuringSnapshotCapture = 3;
359 int lastIndexDuringSnapshotCapture = 4;
361 // 4 messages as part of snapshot, which are applied to state
362 ByteString snapshotBytes = fromObject(Arrays.asList(
363 new MockRaftActorContext.MockPayload("A"),
364 new MockRaftActorContext.MockPayload("B"),
365 new MockRaftActorContext.MockPayload("C"),
366 new MockRaftActorContext.MockPayload("D")));
368 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
369 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1 ,
370 lastAppliedDuringSnapshotCapture, 1);
371 MockSnapshotStore.setMockSnapshot(snapshot);
372 MockSnapshotStore.setPersistenceId(persistenceId);
374 // add more entries after snapshot is taken
375 List<ReplicatedLogEntry> entries = new ArrayList<>();
376 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
377 new MockRaftActorContext.MockPayload("F"));
378 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
379 new MockRaftActorContext.MockPayload("G"));
380 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
381 new MockRaftActorContext.MockPayload("H"));
386 int lastAppliedToState = 5;
389 MockAkkaJournal.addToJournal(5, entry2);
390 // 2 entries are applied to state besides the 4 entries in snapshot
391 MockAkkaJournal.addToJournal(6, new ApplyLogEntries(lastAppliedToState));
392 MockAkkaJournal.addToJournal(7, entry3);
393 MockAkkaJournal.addToJournal(8, entry4);
396 followerActor.tell(PoisonPill.getInstance(), null);
397 expectMsgClass(duration("5 seconds"), Terminated.class);
399 unwatch(followerActor);
401 //reinstate the actor
402 TestActorRef<MockRaftActor> ref = TestActorRef.create(getSystem(),
403 MockRaftActor.props(persistenceId, Collections.<String,String>emptyMap(),
404 Optional.<ConfigParams>of(config)));
406 ref.underlyingActor().waitForRecoveryComplete();
408 RaftActorContext context = ref.underlyingActor().getRaftActorContext();
409 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
410 context.getReplicatedLog().size());
411 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
412 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
413 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
414 assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
419 * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
420 * process recovery messages
426 public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
427 new JavaTestKit(getSystem()) {
429 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryApplicable";
431 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
433 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
435 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
436 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
438 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
440 // Wait for akka's recovery to complete so it doesn't interfere.
441 mockRaftActor.waitForRecoveryComplete();
443 ByteString snapshotBytes = fromObject(Arrays.asList(
444 new MockRaftActorContext.MockPayload("A"),
445 new MockRaftActorContext.MockPayload("B"),
446 new MockRaftActorContext.MockPayload("C"),
447 new MockRaftActorContext.MockPayload("D")));
449 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
450 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
452 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
454 verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
456 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
458 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
460 assertEquals("add replicated log entry", 1, replicatedLog.size());
462 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
464 assertEquals("add replicated log entry", 2, replicatedLog.size());
466 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
468 assertEquals("commit index 1", 1, mockRaftActor.getRaftActorContext().getCommitIndex());
470 // The snapshot had 4 items + we added 2 more items during the test
471 // We start removing from 5 and we should get 1 item in the replicated log
472 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(5));
474 assertEquals("remove log entries", 1, replicatedLog.size());
476 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
478 assertEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
479 assertEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
481 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
483 mockActorRef.tell(PoisonPill.getInstance(), getRef());
489 * This test verifies that when recovery is not applicable (typically when persistence is false) the RaftActor does
490 * not process recovery messages
495 public void testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable() throws Exception {
496 new JavaTestKit(getSystem()) {
498 String persistenceId = "testHandleRecoveryWhenDataPersistenceRecoveryNotApplicable";
500 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
502 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
504 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
505 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), new DataPersistenceProviderMonitor()), persistenceId);
507 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
509 // Wait for akka's recovery to complete so it doesn't interfere.
510 mockRaftActor.waitForRecoveryComplete();
512 ByteString snapshotBytes = fromObject(Arrays.asList(
513 new MockRaftActorContext.MockPayload("A"),
514 new MockRaftActorContext.MockPayload("B"),
515 new MockRaftActorContext.MockPayload("C"),
516 new MockRaftActorContext.MockPayload("D")));
518 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
519 Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
521 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
523 verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
525 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
527 ReplicatedLog replicatedLog = mockRaftActor.getReplicatedLog();
529 assertEquals("add replicated log entry", 0, replicatedLog.size());
531 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(1, 1, new MockRaftActorContext.MockPayload("A")));
533 assertEquals("add replicated log entry", 0, replicatedLog.size());
535 mockRaftActor.onReceiveRecover(new ApplyLogEntries(1));
537 assertEquals("commit index -1", -1, mockRaftActor.getRaftActorContext().getCommitIndex());
539 mockRaftActor.onReceiveRecover(new RaftActor.DeleteEntries(2));
541 assertEquals("remove log entries", 0, replicatedLog.size());
543 mockRaftActor.onReceiveRecover(new RaftActor.UpdateElectionTerm(10, "foobar"));
545 assertNotEquals("election term", 10, mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
546 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
548 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
550 mockActorRef.tell(PoisonPill.getInstance(), getRef());
556 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
557 new JavaTestKit(getSystem()) {
559 String persistenceId = "testUpdatingElectionTermCallsDataPersistence";
561 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
563 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
565 CountDownLatch persistLatch = new CountDownLatch(1);
566 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
567 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
569 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
570 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
572 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
574 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
576 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
578 mockActorRef.tell(PoisonPill.getInstance(), getRef());
585 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
586 new JavaTestKit(getSystem()) {
588 String persistenceId = "testAddingReplicatedLogEntryCallsDataPersistence";
590 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
592 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
594 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
596 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
597 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
599 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
601 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
603 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
605 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
607 mockActorRef.tell(PoisonPill.getInstance(), getRef());
614 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
615 new JavaTestKit(getSystem()) {
617 String persistenceId = "testRemovingReplicatedLogEntryCallsDataPersistence";
619 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
621 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
623 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
625 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
626 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
628 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
630 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
632 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
634 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
636 mockActorRef.tell(PoisonPill.getInstance(), getRef());
643 public void testApplyLogEntriesCallsDataPersistence() throws Exception {
644 new JavaTestKit(getSystem()) {
646 String persistenceId = "testApplyLogEntriesCallsDataPersistence";
648 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
650 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
652 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
654 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
655 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
657 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
659 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
661 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
663 mockActorRef.tell(PoisonPill.getInstance(), getRef());
670 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
671 new JavaTestKit(getSystem()) {
673 String persistenceId = "testCaptureSnapshotReplyCallsDataPersistence";
675 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
677 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
681 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
682 MockRaftActor.props(persistenceId,Collections.<String,String>emptyMap(),
683 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
685 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
687 ByteString snapshotBytes = fromObject(Arrays.asList(
688 new MockRaftActorContext.MockPayload("A"),
689 new MockRaftActorContext.MockPayload("B"),
690 new MockRaftActorContext.MockPayload("C"),
691 new MockRaftActorContext.MockPayload("D")));
693 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
695 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
697 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
699 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
701 verify(dataPersistenceProvider).saveSnapshot(anyObject());
703 mockActorRef.tell(PoisonPill.getInstance(), getRef());
710 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
711 new JavaTestKit(getSystem()) {
713 String persistenceId = "testSaveSnapshotSuccessCallsDataPersistence";
715 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
717 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
719 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
721 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
722 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
724 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
726 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
727 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
728 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
729 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
730 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
732 ByteString snapshotBytes = fromObject(Arrays.asList(
733 new MockRaftActorContext.MockPayload("A"),
734 new MockRaftActorContext.MockPayload("B"),
735 new MockRaftActorContext.MockPayload("C"),
736 new MockRaftActorContext.MockPayload("D")));
738 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
739 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
741 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
743 verify(mockRaftActor.delegate).createSnapshot();
745 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
747 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
749 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
751 verify(dataPersistenceProvider).deleteMessages(100);
753 assertEquals(2, mockRaftActor.getReplicatedLog().size());
755 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
756 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
758 // Index 2 will not be in the log because it was removed due to snapshotting
759 assertNull(mockRaftActor.getReplicatedLog().get(2));
761 mockActorRef.tell(PoisonPill.getInstance(), getRef());
768 public void testApplyState() throws Exception {
770 new JavaTestKit(getSystem()) {
772 String persistenceId = "testApplyState";
774 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
776 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
778 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
780 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
781 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
783 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
785 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
786 new MockRaftActorContext.MockPayload("F"));
788 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
790 verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
792 mockActorRef.tell(PoisonPill.getInstance(), getRef());
799 public void testApplySnapshot() throws Exception {
800 new JavaTestKit(getSystem()) {
802 String persistenceId = "testApplySnapshot";
804 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
806 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
808 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
810 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
811 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
813 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
815 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
817 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
818 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
819 oldReplicatedLog.append(
820 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
821 mock(Payload.class)));
823 ByteString snapshotBytes = fromObject(Arrays.asList(
824 new MockRaftActorContext.MockPayload("A"),
825 new MockRaftActorContext.MockPayload("B"),
826 new MockRaftActorContext.MockPayload("C"),
827 new MockRaftActorContext.MockPayload("D")));
829 Snapshot snapshot = mock(Snapshot.class);
831 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
833 doReturn(3L).when(snapshot).getLastAppliedIndex();
835 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
837 verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
839 assertTrue("The replicatedLog should have changed",
840 oldReplicatedLog != mockRaftActor.getReplicatedLog());
842 assertEquals("lastApplied should be same as in the snapshot",
843 (Long) 3L, mockRaftActor.getLastApplied());
845 assertEquals(0, mockRaftActor.getReplicatedLog().size());
847 mockActorRef.tell(PoisonPill.getInstance(), getRef());
854 public void testSaveSnapshotFailure() throws Exception {
855 new JavaTestKit(getSystem()) {
857 String persistenceId = "testSaveSnapshotFailure";
859 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
861 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
863 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
865 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
866 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
868 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
870 ByteString snapshotBytes = fromObject(Arrays.asList(
871 new MockRaftActorContext.MockPayload("A"),
872 new MockRaftActorContext.MockPayload("B"),
873 new MockRaftActorContext.MockPayload("C"),
874 new MockRaftActorContext.MockPayload("D")));
876 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
878 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
880 mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1));
882 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
884 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
887 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
888 mockRaftActor.getReplicatedLog().getSnapshotIndex());
890 mockActorRef.tell(PoisonPill.getInstance(), getRef());
897 public void testRaftRoleChangeNotifier() throws Exception {
898 new JavaTestKit(getSystem()) {{
899 ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
900 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
901 String id = "testRaftRoleChangeNotifier";
903 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id,
904 Collections.<String,String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), id);
906 // sleeping for a minimum of 2 seconds, if it spans more its fine.
907 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
909 List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
910 assertNotNull(matches);
911 assertEquals(3, matches.size());
913 // check if the notifier got a role change from null to Follower
914 RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
915 assertEquals(id, raftRoleChanged.getMemberId());
916 assertNull(raftRoleChanged.getOldRole());
917 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
919 // check if the notifier got a role change from Follower to Candidate
920 raftRoleChanged = (RoleChanged) matches.get(1);
921 assertEquals(id, raftRoleChanged.getMemberId());
922 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
923 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
925 // check if the notifier got a role change from Candidate to Leader
926 raftRoleChanged = (RoleChanged) matches.get(2);
927 assertEquals(id, raftRoleChanged.getMemberId());
928 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
929 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
934 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
935 new JavaTestKit(getSystem()) {
937 String persistenceId = "leader1";
939 ActorRef followerActor1 =
940 getSystem().actorOf(Props.create(MessageCollectorActor.class));
942 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
943 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
944 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
946 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
948 Map<String, String> peerAddresses = new HashMap<>();
949 peerAddresses.put("follower-1", followerActor1.path().toString());
951 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
952 MockRaftActor.props(persistenceId, peerAddresses,
953 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
955 MockRaftActor leaderActor = mockActorRef.underlyingActor();
956 leaderActor.getRaftActorContext().setCommitIndex(4);
957 leaderActor.getRaftActorContext().setLastApplied(4);
958 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
960 leaderActor.waitForInitializeBehaviorComplete();
962 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
964 Leader leader = new Leader(leaderActor.getRaftActorContext());
965 leaderActor.setCurrentBehavior(leader);
966 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
968 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
969 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
971 assertEquals(8, leaderActor.getReplicatedLog().size());
973 leaderActor.onReceiveCommand(new CaptureSnapshot(6,1,4,1));
974 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
975 verify(leaderActor.delegate).createSnapshot();
977 assertEquals(8, leaderActor.getReplicatedLog().size());
979 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
980 //fake snapshot on index 5
981 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 5, 1));
983 assertEquals(8, leaderActor.getReplicatedLog().size());
985 //fake snapshot on index 6
986 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
987 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 6, 1));
988 assertEquals(8, leaderActor.getReplicatedLog().size());
990 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
992 assertEquals(8, leaderActor.getReplicatedLog().size());
994 ByteString snapshotBytes = fromObject(Arrays.asList(
995 new MockRaftActorContext.MockPayload("foo-0"),
996 new MockRaftActorContext.MockPayload("foo-1"),
997 new MockRaftActorContext.MockPayload("foo-2"),
998 new MockRaftActorContext.MockPayload("foo-3"),
999 new MockRaftActorContext.MockPayload("foo-4")));
1000 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1001 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1003 // capture snapshot reply should remove the snapshotted entries only
1004 assertEquals(3, leaderActor.getReplicatedLog().size());
1005 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
1007 // add another non-replicated entry
1008 leaderActor.getReplicatedLog().append(
1009 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
1011 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
1012 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 7, 1));
1013 assertEquals(2, leaderActor.getReplicatedLog().size());
1014 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
1016 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1023 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
1024 new JavaTestKit(getSystem()) {
1026 String persistenceId = "follower1";
1028 ActorRef leaderActor1 =
1029 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1031 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1032 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1033 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1035 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1037 Map<String, String> peerAddresses = new HashMap<>();
1038 peerAddresses.put("leader", leaderActor1.path().toString());
1040 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1041 MockRaftActor.props(persistenceId, peerAddresses,
1042 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1044 MockRaftActor followerActor = mockActorRef.underlyingActor();
1045 followerActor.getRaftActorContext().setCommitIndex(4);
1046 followerActor.getRaftActorContext().setLastApplied(4);
1047 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1049 followerActor.waitForInitializeBehaviorComplete();
1051 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
1052 Follower follower = new Follower(followerActor.getRaftActorContext());
1053 followerActor.setCurrentBehavior(follower);
1054 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1056 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1057 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
1059 // log as indices 0-5
1060 assertEquals(6, followerActor.getReplicatedLog().size());
1063 followerActor.onReceiveCommand(new CaptureSnapshot(5,1,4,1));
1064 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
1065 verify(followerActor.delegate).createSnapshot();
1067 assertEquals(6, followerActor.getReplicatedLog().size());
1069 //fake snapshot on index 6
1070 List<ReplicatedLogEntry> entries =
1072 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
1073 new MockRaftActorContext.MockPayload("foo-6"))
1075 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 5, 1, entries , 5, 5));
1076 assertEquals(7, followerActor.getReplicatedLog().size());
1078 //fake snapshot on index 7
1079 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1083 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
1084 new MockRaftActorContext.MockPayload("foo-7"))
1086 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 6, 1, entries, 6, 6));
1087 assertEquals(8, followerActor.getReplicatedLog().size());
1089 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
1092 ByteString snapshotBytes = fromObject(Arrays.asList(
1093 new MockRaftActorContext.MockPayload("foo-0"),
1094 new MockRaftActorContext.MockPayload("foo-1"),
1095 new MockRaftActorContext.MockPayload("foo-2"),
1096 new MockRaftActorContext.MockPayload("foo-3"),
1097 new MockRaftActorContext.MockPayload("foo-4")));
1098 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1099 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
1101 // capture snapshot reply should remove the snapshotted entries only
1102 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
1103 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
1107 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
1108 new MockRaftActorContext.MockPayload("foo-7"))
1110 // send an additional entry 8 with leaderCommit = 7
1111 followerActor.onReceiveCommand(new AppendEntries(1, "leader", 7, 1, entries , 7, 7));
1113 // 7 and 8, as lastapplied is 7
1114 assertEquals(2, followerActor.getReplicatedLog().size());
1116 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1123 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
1124 new JavaTestKit(getSystem()) {
1126 String persistenceId = "leader1";
1128 ActorRef followerActor1 =
1129 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1130 ActorRef followerActor2 =
1131 getSystem().actorOf(Props.create(MessageCollectorActor.class));
1133 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1134 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1135 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1137 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
1139 Map<String, String> peerAddresses = new HashMap<>();
1140 peerAddresses.put("follower-1", followerActor1.path().toString());
1141 peerAddresses.put("follower-2", followerActor2.path().toString());
1143 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
1144 MockRaftActor.props(persistenceId, peerAddresses,
1145 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1147 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1148 leaderActor.getRaftActorContext().setCommitIndex(9);
1149 leaderActor.getRaftActorContext().setLastApplied(9);
1150 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1152 leaderActor.waitForInitializeBehaviorComplete();
1154 Leader leader = new Leader(leaderActor.getRaftActorContext());
1155 leaderActor.setCurrentBehavior(leader);
1156 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1158 // create 5 entries in the log
1159 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
1160 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
1161 //set the snapshot index to 4 , 0 to 4 are snapshotted
1162 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
1163 assertEquals(5, leaderActor.getReplicatedLog().size());
1165 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-1", 1, true, 9, 1));
1166 assertEquals(5, leaderActor.getReplicatedLog().size());
1168 // set the 2nd follower nextIndex to 1 which has been snapshotted
1169 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 0, 1));
1170 assertEquals(5, leaderActor.getReplicatedLog().size());
1172 // simulate a real snapshot
1173 leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
1174 assertEquals(5, leaderActor.getReplicatedLog().size());
1175 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
1176 leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId())
1177 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
1180 //reply from a slow follower does not initiate a fake snapshot
1181 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
1182 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1184 ByteString snapshotBytes = fromObject(Arrays.asList(
1185 new MockRaftActorContext.MockPayload("foo-0"),
1186 new MockRaftActorContext.MockPayload("foo-1"),
1187 new MockRaftActorContext.MockPayload("foo-2"),
1188 new MockRaftActorContext.MockPayload("foo-3"),
1189 new MockRaftActorContext.MockPayload("foo-4")));
1190 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1191 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
1193 assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
1195 //reply from a slow follower after should not raise errors
1196 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 5, 1));
1197 assertEquals(0, leaderActor.getReplicatedLog().size());
1199 mockActorRef.tell(PoisonPill.getInstance(), getRef());
1207 private ByteString fromObject(Object snapshot) throws Exception {
1208 ByteArrayOutputStream b = null;
1209 ObjectOutputStream o = null;
1211 b = new ByteArrayOutputStream();
1212 o = new ObjectOutputStream(b);
1213 o.writeObject(snapshot);
1214 byte[] snapshotBytes = b.toByteArray();
1215 return ByteString.copyFrom(snapshotBytes);