1 package org.opendaylight.controller.cluster.raft;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Matchers.any;
9 import static org.mockito.Matchers.anyObject;
10 import static org.mockito.Matchers.eq;
11 import static org.mockito.Matchers.same;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.times;
15 import static org.mockito.Mockito.verify;
16 import akka.actor.ActorRef;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Terminated;
20 import akka.japi.Procedure;
21 import akka.persistence.SaveSnapshotFailure;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.persistence.SnapshotMetadata;
24 import akka.persistence.SnapshotOffer;
25 import akka.persistence.SnapshotSelectionCriteria;
26 import akka.testkit.JavaTestKit;
27 import akka.testkit.TestActorRef;
28 import com.google.common.base.Optional;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import com.google.protobuf.ByteString;
32 import java.io.ByteArrayOutputStream;
33 import java.io.ObjectOutputStream;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.DataPersistenceProvider;
46 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
47 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
48 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
49 import org.opendaylight.controller.cluster.notifications.RoleChanged;
50 import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
51 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
56 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
57 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
58 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
59 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
60 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
64 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
65 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import scala.concurrent.duration.FiniteDuration;
69 public class RaftActorTest extends AbstractActorTest {
71 private TestActorFactory factory;
75 factory = new TestActorFactory(getSystem());
79 public void tearDown() throws Exception {
81 InMemoryJournal.clear();
82 InMemorySnapshotStore.clear();
86 public void testConstruction() {
87 new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
91 public void testFindLeaderWhenLeaderIsSelf(){
92 RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
93 kit.waitUntilLeader();
97 public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
98 new JavaTestKit(getSystem()) {{
99 String persistenceId = factory.generateActorId("follower-");
101 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
103 // Set the heartbeat interval high to essentially disable election otherwise the test
104 // may fail if the actor is switched to Leader and the commitIndex is set to the last
106 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
108 ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
109 ImmutableMap.<String, String>builder().put("member1", "address").build(),
110 Optional.<ConfigParams>of(config)), persistenceId);
112 watch(followerActor);
114 List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
115 ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
116 new MockRaftActorContext.MockPayload("E"));
117 snapshotUnappliedEntries.add(entry1);
119 int lastAppliedDuringSnapshotCapture = 3;
120 int lastIndexDuringSnapshotCapture = 4;
122 // 4 messages as part of snapshot, which are applied to state
123 ByteString snapshotBytes = fromObject(Arrays.asList(
124 new MockRaftActorContext.MockPayload("A"),
125 new MockRaftActorContext.MockPayload("B"),
126 new MockRaftActorContext.MockPayload("C"),
127 new MockRaftActorContext.MockPayload("D")));
129 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
130 snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
131 lastAppliedDuringSnapshotCapture, 1);
132 InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
134 // add more entries after snapshot is taken
135 List<ReplicatedLogEntry> entries = new ArrayList<>();
136 ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
137 new MockRaftActorContext.MockPayload("F", 2));
138 ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
139 new MockRaftActorContext.MockPayload("G", 3));
140 ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
141 new MockRaftActorContext.MockPayload("H", 4));
146 int lastAppliedToState = 5;
149 InMemoryJournal.addEntry(persistenceId, 5, entry2);
150 // 2 entries are applied to state besides the 4 entries in snapshot
151 InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
152 InMemoryJournal.addEntry(persistenceId, 7, entry3);
153 InMemoryJournal.addEntry(persistenceId, 8, entry4);
156 followerActor.tell(PoisonPill.getInstance(), null);
157 expectMsgClass(duration("5 seconds"), Terminated.class);
159 unwatch(followerActor);
161 //reinstate the actor
162 TestActorRef<MockRaftActor> ref = factory.createTestActor(
163 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
164 Optional.<ConfigParams>of(config)));
166 MockRaftActor mockRaftActor = ref.underlyingActor();
168 mockRaftActor.waitForRecoveryComplete();
170 RaftActorContext context = mockRaftActor.getRaftActorContext();
171 assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
172 context.getReplicatedLog().size());
173 assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
174 assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
175 assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
176 assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
177 assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
179 mockRaftActor.waitForInitializeBehaviorComplete();
181 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
186 public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
187 new JavaTestKit(getSystem()) {{
188 String persistenceId = factory.generateActorId("follower-");
190 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
192 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
194 TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
195 ImmutableMap.<String, String>builder().put("member1", "address").build(),
196 Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
198 MockRaftActor mockRaftActor = ref.underlyingActor();
200 mockRaftActor.waitForRecoveryComplete();
202 mockRaftActor.waitForInitializeBehaviorComplete();
204 assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
209 public void testRaftActorForwardsToRaftActorRecoverySupport() {
210 String persistenceId = factory.generateActorId("leader-");
212 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
214 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
216 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
217 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
219 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
221 // Wait for akka's recovery to complete so it doesn't interfere.
222 mockRaftActor.waitForRecoveryComplete();
224 RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
225 mockRaftActor.setRaftActorRecoverySupport(mockSupport );
227 Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
228 SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
229 mockRaftActor.handleRecover(snapshotOffer);
231 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
232 1, new MockRaftActorContext.MockPayload("1", 5));
233 mockRaftActor.handleRecover(logEntry);
235 ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
236 mockRaftActor.handleRecover(applyJournalEntries);
238 ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
239 mockRaftActor.handleRecover(applyLogEntries);
241 DeleteEntries deleteEntries = new DeleteEntries(1);
242 mockRaftActor.handleRecover(deleteEntries);
244 UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
245 mockRaftActor.handleRecover(updateElectionTerm);
247 verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
248 verify(mockSupport).handleRecoveryMessage(same(logEntry));
249 verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
250 verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
251 verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
252 verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
256 public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
257 new JavaTestKit(getSystem()) {
259 String persistenceId = factory.generateActorId("leader-");
261 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
263 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
265 CountDownLatch persistLatch = new CountDownLatch(1);
266 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
267 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
269 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
270 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
272 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
274 mockRaftActor.waitForInitializeBehaviorComplete();
276 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
278 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
284 public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
285 new JavaTestKit(getSystem()) {
287 String persistenceId = factory.generateActorId("leader-");
289 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
291 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
293 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
295 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
296 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
298 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
300 mockRaftActor.waitForInitializeBehaviorComplete();
302 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
304 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
306 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
312 public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
313 new JavaTestKit(getSystem()) {
315 String persistenceId = factory.generateActorId("leader-");
317 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
319 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
321 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
323 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
324 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
326 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
328 mockRaftActor.waitForInitializeBehaviorComplete();
330 mockRaftActor.waitUntilLeader();
332 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
334 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
336 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
342 public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
343 new JavaTestKit(getSystem()) {
345 String persistenceId = factory.generateActorId("leader-");
347 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
349 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
351 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
353 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
354 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
356 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
358 mockRaftActor.waitForInitializeBehaviorComplete();
360 mockRaftActor.waitUntilLeader();
362 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
364 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
372 public void testCaptureSnapshotReplyCallsDataPersistence() throws Exception {
373 new JavaTestKit(getSystem()) {
375 String persistenceId = factory.generateActorId("leader-");
377 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
379 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
381 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
383 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
384 MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
385 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
387 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
389 mockRaftActor.waitForInitializeBehaviorComplete();
391 ByteString snapshotBytes = fromObject(Arrays.asList(
392 new MockRaftActorContext.MockPayload("A"),
393 new MockRaftActorContext.MockPayload("B"),
394 new MockRaftActorContext.MockPayload("C"),
395 new MockRaftActorContext.MockPayload("D")));
397 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
399 raftActorContext.getSnapshotManager().capture(
400 new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
401 new MockRaftActorContext.MockPayload("D")), -1);
403 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
405 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
407 verify(dataPersistenceProvider).saveSnapshot(anyObject());
414 public void testSaveSnapshotSuccessCallsDataPersistence() throws Exception {
415 new JavaTestKit(getSystem()) {
417 String persistenceId = factory.generateActorId("leader-");
419 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
421 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
423 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
425 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
426 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
428 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
430 mockRaftActor.waitForInitializeBehaviorComplete();
431 MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
433 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
434 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
435 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
436 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
437 mockRaftActor.getReplicatedLog().append(lastEntry);
439 ByteString snapshotBytes = fromObject(Arrays.asList(
440 new MockRaftActorContext.MockPayload("A"),
441 new MockRaftActorContext.MockPayload("B"),
442 new MockRaftActorContext.MockPayload("C"),
443 new MockRaftActorContext.MockPayload("D")));
445 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
446 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
448 long replicatedToAllIndex = 1;
450 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
452 verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
454 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
456 mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
458 verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
460 verify(dataPersistenceProvider).deleteMessages(100);
462 assertEquals(3, mockRaftActor.getReplicatedLog().size());
463 assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
465 assertNotNull(mockRaftActor.getReplicatedLog().get(2));
466 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
467 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
469 // Index 2 will not be in the log because it was removed due to snapshotting
470 assertNull(mockRaftActor.getReplicatedLog().get(1));
471 assertNull(mockRaftActor.getReplicatedLog().get(0));
478 public void testApplyState() throws Exception {
480 new JavaTestKit(getSystem()) {
482 String persistenceId = factory.generateActorId("leader-");
484 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
486 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
488 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
490 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
491 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
493 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
495 mockRaftActor.waitForInitializeBehaviorComplete();
497 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
498 new MockRaftActorContext.MockPayload("F"));
500 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
502 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
509 public void testApplySnapshot() throws Exception {
510 new JavaTestKit(getSystem()) {
512 String persistenceId = factory.generateActorId("leader-");
514 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
516 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
518 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
520 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
521 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
523 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
525 mockRaftActor.waitForInitializeBehaviorComplete();
527 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
529 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
530 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
531 oldReplicatedLog.append(
532 new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
533 mock(Payload.class)));
535 ByteString snapshotBytes = fromObject(Arrays.asList(
536 new MockRaftActorContext.MockPayload("A"),
537 new MockRaftActorContext.MockPayload("B"),
538 new MockRaftActorContext.MockPayload("C"),
539 new MockRaftActorContext.MockPayload("D")));
541 Snapshot snapshot = mock(Snapshot.class);
543 doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
545 doReturn(3L).when(snapshot).getLastAppliedIndex();
547 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
549 verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
551 assertTrue("The replicatedLog should have changed",
552 oldReplicatedLog != mockRaftActor.getReplicatedLog());
554 assertEquals("lastApplied should be same as in the snapshot",
555 (Long) 3L, mockRaftActor.getLastApplied());
557 assertEquals(0, mockRaftActor.getReplicatedLog().size());
564 public void testSaveSnapshotFailure() throws Exception {
565 new JavaTestKit(getSystem()) {
567 String persistenceId = factory.generateActorId("leader-");
569 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
571 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
573 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
575 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
576 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
578 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
580 mockRaftActor.waitForInitializeBehaviorComplete();
582 ByteString snapshotBytes = fromObject(Arrays.asList(
583 new MockRaftActorContext.MockPayload("A"),
584 new MockRaftActorContext.MockPayload("B"),
585 new MockRaftActorContext.MockPayload("C"),
586 new MockRaftActorContext.MockPayload("D")));
588 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
590 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
592 raftActorContext.getSnapshotManager().capture(
593 new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
594 new MockRaftActorContext.MockPayload("D")), 1);
596 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
598 mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
601 assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
602 mockRaftActor.getReplicatedLog().getSnapshotIndex());
609 public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
610 new JavaTestKit(getSystem()) {{
611 TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
612 Props.create(MessageCollectorActor.class));
613 MessageCollectorActor.waitUntilReady(notifierActor);
615 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
616 long heartBeatInterval = 100;
617 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
618 config.setElectionTimeoutFactor(20);
620 String persistenceId = factory.generateActorId("notifier-");
622 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
623 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
624 new NonPersistentDataProvider()), persistenceId);
626 List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
629 // check if the notifier got a role change from null to Follower
630 RoleChanged raftRoleChanged = matches.get(0);
631 assertEquals(persistenceId, raftRoleChanged.getMemberId());
632 assertNull(raftRoleChanged.getOldRole());
633 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
635 // check if the notifier got a role change from Follower to Candidate
636 raftRoleChanged = matches.get(1);
637 assertEquals(persistenceId, raftRoleChanged.getMemberId());
638 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
639 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
641 // check if the notifier got a role change from Candidate to Leader
642 raftRoleChanged = matches.get(2);
643 assertEquals(persistenceId, raftRoleChanged.getMemberId());
644 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
645 assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
647 LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
648 notifierActor, LeaderStateChanged.class);
650 assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
652 notifierActor.underlyingActor().clear();
654 MockRaftActor raftActor = raftActorRef.underlyingActor();
655 final String newLeaderId = "new-leader";
656 Follower follower = new Follower(raftActor.getRaftActorContext()) {
658 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
659 leaderId = newLeaderId;
664 raftActor.changeCurrentBehavior(follower);
666 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
667 assertEquals(persistenceId, leaderStateChange.getMemberId());
668 assertEquals(null, leaderStateChange.getLeaderId());
670 raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
671 assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
672 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
674 notifierActor.underlyingActor().clear();
676 raftActor.handleCommand("any");
678 leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
679 assertEquals(persistenceId, leaderStateChange.getMemberId());
680 assertEquals(newLeaderId, leaderStateChange.getLeaderId());
685 public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
686 new JavaTestKit(getSystem()) {{
687 ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
688 MessageCollectorActor.waitUntilReady(notifierActor);
690 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
691 long heartBeatInterval = 100;
692 config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
693 config.setElectionTimeoutFactor(1);
695 String persistenceId = factory.generateActorId("notifier-");
697 factory.createActor(MockRaftActor.props(persistenceId,
698 ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
700 List<RoleChanged> matches = null;
701 for(int i = 0; i < 5000 / heartBeatInterval; i++) {
702 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
703 assertNotNull(matches);
704 if(matches.size() == 3) {
707 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
710 assertEquals(2, matches.size());
712 // check if the notifier got a role change from null to Follower
713 RoleChanged raftRoleChanged = matches.get(0);
714 assertEquals(persistenceId, raftRoleChanged.getMemberId());
715 assertNull(raftRoleChanged.getOldRole());
716 assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
718 // check if the notifier got a role change from Follower to Candidate
719 raftRoleChanged = matches.get(1);
720 assertEquals(persistenceId, raftRoleChanged.getMemberId());
721 assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
722 assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
728 public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
729 new JavaTestKit(getSystem()) {
731 String persistenceId = factory.generateActorId("leader-");
732 String follower1Id = factory.generateActorId("follower-");
734 ActorRef followerActor1 =
735 factory.createActor(Props.create(MessageCollectorActor.class));
737 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
738 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
739 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
741 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
743 Map<String, String> peerAddresses = new HashMap<>();
744 peerAddresses.put(follower1Id, followerActor1.path().toString());
746 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
747 MockRaftActor.props(persistenceId, peerAddresses,
748 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
750 MockRaftActor leaderActor = mockActorRef.underlyingActor();
752 leaderActor.getRaftActorContext().setCommitIndex(4);
753 leaderActor.getRaftActorContext().setLastApplied(4);
754 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
756 leaderActor.waitForInitializeBehaviorComplete();
758 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
760 Leader leader = new Leader(leaderActor.getRaftActorContext());
761 leaderActor.setCurrentBehavior(leader);
762 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
764 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
765 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
767 assertEquals(8, leaderActor.getReplicatedLog().size());
769 leaderActor.getRaftActorContext().getSnapshotManager()
770 .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
771 new MockRaftActorContext.MockPayload("x")), 4);
773 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
775 assertEquals(8, leaderActor.getReplicatedLog().size());
777 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
778 //fake snapshot on index 5
779 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
781 assertEquals(8, leaderActor.getReplicatedLog().size());
783 //fake snapshot on index 6
784 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
785 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
786 assertEquals(8, leaderActor.getReplicatedLog().size());
788 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
790 assertEquals(8, leaderActor.getReplicatedLog().size());
792 ByteString snapshotBytes = fromObject(Arrays.asList(
793 new MockRaftActorContext.MockPayload("foo-0"),
794 new MockRaftActorContext.MockPayload("foo-1"),
795 new MockRaftActorContext.MockPayload("foo-2"),
796 new MockRaftActorContext.MockPayload("foo-3"),
797 new MockRaftActorContext.MockPayload("foo-4")));
799 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
800 , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
802 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
804 // The commit is needed to complete the snapshot creation process
805 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
807 // capture snapshot reply should remove the snapshotted entries only
808 assertEquals(3, leaderActor.getReplicatedLog().size());
809 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
811 // add another non-replicated entry
812 leaderActor.getReplicatedLog().append(
813 new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
815 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
816 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
817 assertEquals(2, leaderActor.getReplicatedLog().size());
818 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
825 public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
826 new JavaTestKit(getSystem()) {
828 String persistenceId = factory.generateActorId("follower-");
829 String leaderId = factory.generateActorId("leader-");
832 ActorRef leaderActor1 =
833 factory.createActor(Props.create(MessageCollectorActor.class));
835 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
836 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
837 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
839 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
841 Map<String, String> peerAddresses = new HashMap<>();
842 peerAddresses.put(leaderId, leaderActor1.path().toString());
844 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
845 MockRaftActor.props(persistenceId, peerAddresses,
846 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
848 MockRaftActor followerActor = mockActorRef.underlyingActor();
849 followerActor.getRaftActorContext().setCommitIndex(4);
850 followerActor.getRaftActorContext().setLastApplied(4);
851 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
853 followerActor.waitForInitializeBehaviorComplete();
856 Follower follower = new Follower(followerActor.getRaftActorContext());
857 followerActor.setCurrentBehavior(follower);
858 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
860 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
861 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
862 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
864 // log has indices 0-5
865 assertEquals(6, followerActor.getReplicatedLog().size());
868 followerActor.getRaftActorContext().getSnapshotManager().capture(
869 new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
870 new MockRaftActorContext.MockPayload("D")), 4);
872 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
874 assertEquals(6, followerActor.getReplicatedLog().size());
876 //fake snapshot on index 6
877 List<ReplicatedLogEntry> entries =
879 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
880 new MockRaftActorContext.MockPayload("foo-6"))
882 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
883 assertEquals(7, followerActor.getReplicatedLog().size());
885 //fake snapshot on index 7
886 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
890 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
891 new MockRaftActorContext.MockPayload("foo-7"))
893 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
894 assertEquals(8, followerActor.getReplicatedLog().size());
896 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
899 ByteString snapshotBytes = fromObject(Arrays.asList(
900 new MockRaftActorContext.MockPayload("foo-0"),
901 new MockRaftActorContext.MockPayload("foo-1"),
902 new MockRaftActorContext.MockPayload("foo-2"),
903 new MockRaftActorContext.MockPayload("foo-3"),
904 new MockRaftActorContext.MockPayload("foo-4")));
905 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
906 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
908 // The commit is needed to complete the snapshot creation process
909 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
911 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
912 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
913 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
917 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
918 new MockRaftActorContext.MockPayload("foo-7"))
920 // send an additional entry 8 with leaderCommit = 7
921 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
923 // 7 and 8, as lastapplied is 7
924 assertEquals(2, followerActor.getReplicatedLog().size());
931 public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
932 new JavaTestKit(getSystem()) {
934 String persistenceId = factory.generateActorId("leader-");
935 String follower1Id = factory.generateActorId("follower-");
936 String follower2Id = factory.generateActorId("follower-");
938 ActorRef followerActor1 =
939 factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
940 ActorRef followerActor2 =
941 factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
943 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
944 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
945 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
947 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
949 Map<String, String> peerAddresses = new HashMap<>();
950 peerAddresses.put(follower1Id, followerActor1.path().toString());
951 peerAddresses.put(follower2Id, followerActor2.path().toString());
953 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
954 MockRaftActor.props(persistenceId, peerAddresses,
955 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
957 MockRaftActor leaderActor = mockActorRef.underlyingActor();
958 leaderActor.getRaftActorContext().setCommitIndex(9);
959 leaderActor.getRaftActorContext().setLastApplied(9);
960 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
962 leaderActor.waitForInitializeBehaviorComplete();
964 Leader leader = new Leader(leaderActor.getRaftActorContext());
965 leaderActor.setCurrentBehavior(leader);
966 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
968 // create 5 entries in the log
969 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
970 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
972 //set the snapshot index to 4 , 0 to 4 are snapshotted
973 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
974 //setting replicatedToAllIndex = 9, for the log to clear
975 leader.setReplicatedToAllIndex(9);
976 assertEquals(5, leaderActor.getReplicatedLog().size());
977 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
979 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
980 assertEquals(5, leaderActor.getReplicatedLog().size());
981 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
983 // set the 2nd follower nextIndex to 1 which has been snapshotted
984 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
985 assertEquals(5, leaderActor.getReplicatedLog().size());
986 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
988 // simulate a real snapshot
989 leaderActor.onReceiveCommand(new SendHeartBeat());
990 assertEquals(5, leaderActor.getReplicatedLog().size());
991 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
992 leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
993 , RaftState.Leader, leaderActor.getCurrentBehavior().state());
996 //reply from a slow follower does not initiate a fake snapshot
997 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
998 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
1000 ByteString snapshotBytes = fromObject(Arrays.asList(
1001 new MockRaftActorContext.MockPayload("foo-0"),
1002 new MockRaftActorContext.MockPayload("foo-1"),
1003 new MockRaftActorContext.MockPayload("foo-2"),
1004 new MockRaftActorContext.MockPayload("foo-3"),
1005 new MockRaftActorContext.MockPayload("foo-4")));
1006 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
1007 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1009 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
1011 //reply from a slow follower after should not raise errors
1012 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
1013 assertEquals(0, leaderActor.getReplicatedLog().size());
1019 public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
1020 new JavaTestKit(getSystem()) {{
1021 String persistenceId = factory.generateActorId("leader-");
1022 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1023 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1024 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1025 config.setSnapshotBatchCount(5);
1027 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1029 Map<String, String> peerAddresses = new HashMap<>();
1031 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1032 MockRaftActor.props(persistenceId, peerAddresses,
1033 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1035 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1036 leaderActor.getRaftActorContext().setCommitIndex(3);
1037 leaderActor.getRaftActorContext().setLastApplied(3);
1038 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1040 leaderActor.waitForInitializeBehaviorComplete();
1041 for(int i=0;i< 4;i++) {
1042 leaderActor.getReplicatedLog()
1043 .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
1044 new MockRaftActorContext.MockPayload("A")));
1047 Leader leader = new Leader(leaderActor.getRaftActorContext());
1048 leaderActor.setCurrentBehavior(leader);
1049 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1051 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1052 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1054 // Now send a CaptureSnapshotReply
1055 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1057 // Trimming log in this scenario is a no-op
1058 assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
1059 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1060 assertEquals(-1, leader.getReplicatedToAllIndex());
1066 public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
1067 new JavaTestKit(getSystem()) {{
1068 String persistenceId = factory.generateActorId("leader-");
1069 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
1070 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
1071 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
1072 config.setSnapshotBatchCount(5);
1074 DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
1076 Map<String, String> peerAddresses = new HashMap<>();
1078 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
1079 MockRaftActor.props(persistenceId, peerAddresses,
1080 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
1082 MockRaftActor leaderActor = mockActorRef.underlyingActor();
1083 leaderActor.getRaftActorContext().setCommitIndex(3);
1084 leaderActor.getRaftActorContext().setLastApplied(3);
1085 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
1086 leaderActor.getReplicatedLog().setSnapshotIndex(3);
1088 leaderActor.waitForInitializeBehaviorComplete();
1089 Leader leader = new Leader(leaderActor.getRaftActorContext());
1090 leaderActor.setCurrentBehavior(leader);
1091 leader.setReplicatedToAllIndex(3);
1092 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
1094 // Persist another entry (this will cause a CaptureSnapshot to be triggered
1095 leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
1097 // Now send a CaptureSnapshotReply
1098 mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
1100 // Trimming log in this scenario is a no-op
1101 assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
1102 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
1103 assertEquals(3, leader.getReplicatedToAllIndex());
1108 private ByteString fromObject(Object snapshot) throws Exception {
1109 ByteArrayOutputStream b = null;
1110 ObjectOutputStream o = null;
1112 b = new ByteArrayOutputStream();
1113 o = new ObjectOutputStream(b);
1114 o.writeObject(snapshot);
1115 byte[] snapshotBytes = b.toByteArray();
1116 return ByteString.copyFrom(snapshotBytes);