- public void testFindLeaderWhenLeaderIsSelf(){
- RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
- kit.waitUntilLeader();
- }
-
- @Test
- public void testRaftActorRecovery() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- // Set the heartbeat interval high to essentially disable election otherwise the test
- // may fail if the actor is switched to Leader and the commitIndex is set to the last
- // log entry.
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
-
- watch(followerActor);
-
- List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
- new MockRaftActorContext.MockPayload("E"));
- snapshotUnappliedEntries.add(entry1);
-
- int lastAppliedDuringSnapshotCapture = 3;
- int lastIndexDuringSnapshotCapture = 4;
-
- // 4 messages as part of snapshot, which are applied to state
- ByteString snapshotBytes = fromObject(Arrays.asList(
- new MockRaftActorContext.MockPayload("A"),
- new MockRaftActorContext.MockPayload("B"),
- new MockRaftActorContext.MockPayload("C"),
- new MockRaftActorContext.MockPayload("D")));
-
- Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
- snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
- lastAppliedDuringSnapshotCapture, 1);
- InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
-
- // add more entries after snapshot is taken
- List<ReplicatedLogEntry> entries = new ArrayList<>();
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
- new MockRaftActorContext.MockPayload("F"));
- ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
- new MockRaftActorContext.MockPayload("G"));
- ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
- new MockRaftActorContext.MockPayload("H"));
- entries.add(entry2);
- entries.add(entry3);
- entries.add(entry4);
-
- int lastAppliedToState = 5;
- int lastIndex = 7;
-
- InMemoryJournal.addEntry(persistenceId, 5, entry2);
- // 2 entries are applied to state besides the 4 entries in snapshot
- InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
- InMemoryJournal.addEntry(persistenceId, 7, entry3);
- InMemoryJournal.addEntry(persistenceId, 8, entry4);
-
- // kill the actor
- followerActor.tell(PoisonPill.getInstance(), null);
- expectMsgClass(duration("5 seconds"), Terminated.class);
-
- unwatch(followerActor);
-
- //reinstate the actor
- TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
- Optional.<ConfigParams>of(config)));
-
- ref.underlyingActor().waitForRecoveryComplete();
-
- RaftActorContext context = ref.underlyingActor().getRaftActorContext();
- assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
- context.getReplicatedLog().size());
- assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
- assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
- assertEquals("Recovered state size", 6, ref.underlyingActor().getState().size());
- }};
- }
-
- @Test
- public void testRaftActorRecoveryWithPreLithuimApplyLogEntries() throws Exception {
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- // Setup the persisted journal with some entries
- ReplicatedLogEntry entry0 = new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
- new MockRaftActorContext.MockPayload("zero"));
- ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
- new MockRaftActorContext.MockPayload("oen"));
- ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 2,
- new MockRaftActorContext.MockPayload("two"));
-
- long seqNr = 1;
- InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
- InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
- InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
- InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
-
- int lastAppliedToState = 1;
- int lastIndex = 2;
-
- //reinstate the actor
- TestActorRef<MockRaftActor> leaderActor = factory.createTestActor(
- MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
- Optional.<ConfigParams>of(config)));
-
- leaderActor.underlyingActor().waitForRecoveryComplete();
-
- RaftActorContext context = leaderActor.underlyingActor().getRaftActorContext();
- assertEquals("Journal log size", 3, context.getReplicatedLog().size());
- assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
- assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
- assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
- }};
- }
-
- /**
- * This test verifies that when recovery is applicable (typically when persistence is true) the RaftActor does
- * process recovery messages
- *
- * @throws Exception
- */
-
- @Test
- public void testHandleRecoveryWhenDataPersistenceRecoveryApplicable() throws Exception {
- new JavaTestKit(getSystem()) {
- {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
-
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();