initializeBehavior();
raftRecovery = null;
-
- if (context.getReplicatedLog().size() > 0) {
- self().tell(new InitiateCaptureSnapshot(), self());
- LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
- } else {
- LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
- }
}
}
}
private boolean isLogEntryPresent(long index){
- if(index == context.getReplicatedLog().getSnapshotIndex()){
+ if(context.getReplicatedLog().isInSnapshot(index)) {
return true;
}
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
- && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
- && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
- && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
- // Wait for snapshot after recovery
- MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+ node1RaftActor.waitForInitializeBehaviorComplete();
+ node2RaftActor.waitForInitializeBehaviorComplete();
// Verify the intended server config was loaded and applied.
verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
- MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
+ verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+ nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
+ votingServer("downNode2"));
assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
// For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
- // Wait for snapshot after recovery
- MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
// Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
// node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
// RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
- // Wait for snapshot after recovery
- MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
// Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
// node1 to try to elect itself as leader in order to apply the new server config. However node1's log
// is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
- // Wait for snapshot after recovery
- MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
-
// Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
// node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
// RequestVote messages in node2 and make it the leader so node1 should forward the server change
}};
}
- @Test
- public void testRaftActorOnRecoverySnapshot() throws Exception {
- TEST_LOG.info("testRaftActorOnRecoverySnapshot");
-
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- // Set the heartbeat interval high to essentially disable election otherwise the test
- // may fail if the actor is switched to Leader
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
-
- // Create mock ReplicatedLogEntry
- ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
- new MockRaftActorContext.MockPayload("F", 1));
-
- InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
-
- TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config));
-
- MockRaftActor mockRaftActor = ref.underlyingActor();
-
- mockRaftActor.waitForRecoveryComplete();
-
- mockRaftActor.waitForInitializeBehaviorComplete();
-
- verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
- }};
- }
-
@Test
public void testSwitchBehavior(){
String persistenceId = factory.generateActorId("leader-");
mockRaftActor.waitForRecoveryComplete();
- // Wait for snapshot after recovery
- verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
-
mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
// Verify the persisted snapshot in the leader. This should reflect the advanced snapshot index as
// the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- assertEquals("Persisted snapshots size", 2, persistedSnapshots.size());
- verifySnapshot("Persisted", persistedSnapshots.get(1), initialTerm, 2, currentTerm, 3);
- List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(1).getUnAppliedEntries();
+ assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
+ verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
+ List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);