package org.opendaylight.controller.cluster.raft;
import static org.junit.Assert.assertEquals;
+import akka.actor.ActorRef;
import akka.persistence.SaveSnapshotSuccess;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
/**
follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
newFollowerConfigParams());
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).build();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1Id, follower1Actor.path().toString());
+ peerAddresses.put(follower2Id, "");
leaderConfigParams = newLeaderConfigParams();
leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
send2InitialPayloads();
// Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
-
- // First, deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
// Send another payload.
MockPayload payload4 = sendPayloadData(leaderActor, "four");
// Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
// Now deliver the CaptureSnapshotReply to the leader.
CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
}
@Test
- public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+ public void testStatePersistedAfterSnapshotPersisted() {
send2InitialPayloads();
// Block these messages initially so we can control the sequence.
- leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
MockPayload payload2 = sendPayloadData(leaderActor, "two");
MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
- CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
- leaderCollectorActor, CaptureSnapshot.class);
+ // Wait for snapshot complete.
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- // First, deliver the AppendEntries to the follower
+ // Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
- // Now deliver the CaptureSnapshot to the leader.
- leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
- leaderActor.tell(captureSnapshot, leaderActor);
-
- // Wait for snapshot complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
reinstateLeaderActor();
assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
- // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
- // were included in the snapshot. They were also included as unapplied entries in the snapshot as
- // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
- // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
- // This is a side effect of trimming the persisted log to the sequence number captured at the time
- // the snapshot was initiated.
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
- payload3, payload4), leaderActor.underlyingActor().getState());
+ assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+ leaderActor.underlyingActor().getState());
}
@Test
- public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+ public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
send2InitialPayloads();
- // Block these messages initially so we can control the sequence.
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+ leader = leaderActor.underlyingActor().getCurrentBehavior();
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+
+ leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
MockPayload payload2 = sendPayloadData(leaderActor, "two");
- // This should trigger a snapshot.
- MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ // Verify the leader applies the 3rd payload state.
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
- // Send another payload.
- MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
- MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+ assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+ assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
+ assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
- // Wait for snapshot complete.
- MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+ killActor(follower2Actor);
- // Now deliver the AppendEntries to the follower
- follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+ InMemoryJournal.clear();
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+ TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
+ follower2CollectorActor = follower2Underlying.collectorActor();
+ follower2Context = follower2Underlying.getRaftActorContext();
- reinstateLeaderActor();
+ leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
- assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
- assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
- assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
- assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
- assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
- assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+ // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
- assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
- leaderActor.underlyingActor().getState());
+ // Wait for the follower to persist the snapshot.
+ MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
+ List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
+
+ assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+ assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+
+ killActor(follower2Actor);
+
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+ newFollowerConfigParams());
+
+ follower2Underlying = follower2Actor.underlyingActor();
+ follower2Underlying.waitForRecoveryComplete();
+ follower2Context = follower2Underlying.getRaftActorContext();
+
+ assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+ assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+ assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
+ assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
}
private void reinstateLeaderActor() {
currentTerm = leaderContext.getTermInformation().getCurrentTerm();
payload0 = sendPayloadData(leaderActor, "zero");
+
+ MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+
payload1 = sendPayloadData(leaderActor, "one");
// Verify the leader applies the states.