X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRecoveryIntegrationTest.java;h=2a881b69ee64c24fa03556a37f8783c052b9be2d;hp=a8f490e75119678fc84084c50c85dec204941b1d;hb=6dbf8f82cfa9fe8c35e4085213a55cb887cc3aee;hpb=f9a9cd1ea40d2477ccb16b03c71a87595226595a diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java index a8f490e751..2a881b69ee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -8,16 +8,22 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; + +import akka.actor.ActorRef; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; /** @@ -35,11 +41,12 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), newFollowerConfigParams()); - peerAddresses = ImmutableMap.builder(). - put(follower1Id, follower1Actor.path().toString()).build(); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString()); + leaderPeerAddresses.put(follower2Id, ""); leaderConfigParams = newLeaderConfigParams(); - leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams); follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); @@ -53,31 +60,23 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { send2InitialPayloads(); // Block these messages initially so we can control the sequence. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); - MockPayload payload2 = sendPayloadData(leaderActor, "two"); + final MockPayload payload2 = sendPayloadData(leaderActor, "two"); // This should trigger a snapshot. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + final MockPayload payload3 = sendPayloadData(leaderActor, "three"); MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); - - // First, deliver the CaptureSnapshot to the leader. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - // Send another payload. - MockPayload payload4 = sendPayloadData(leaderActor, "four"); + final MockPayload payload4 = sendPayloadData(leaderActor, "four"); // Now deliver the AppendEntries to the follower follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); - MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1); // Now deliver the CaptureSnapshotReply to the leader. CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching( @@ -102,38 +101,30 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { } @Test - public void testStatePersistedBetweenInitiateSnapshotAndCapture() { + public void testStatePersistedAfterSnapshotPersisted() { send2InitialPayloads(); // Block these messages initially so we can control the sequence. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); - MockPayload payload2 = sendPayloadData(leaderActor, "two"); + final MockPayload payload2 = sendPayloadData(leaderActor, "two"); // This should trigger a snapshot. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + final MockPayload payload3 = sendPayloadData(leaderActor, "three"); // Send another payload. - MockPayload payload4 = sendPayloadData(leaderActor, "four"); + final MockPayload payload4 = sendPayloadData(leaderActor, "four"); MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); + // Wait for snapshot complete. + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - // First, deliver the AppendEntries to the follower + // Now deliver the AppendEntries to the follower follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); - MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); - - // Now deliver the CaptureSnapshot to the leader. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - - // Wait for snapshot complete. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1); reinstateLeaderActor(); @@ -144,53 +135,73 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); - // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so - // were included in the snapshot. They were also included as unapplied entries in the snapshot as - // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the - // state on recovery by the ApplyJournalEntries messages which remained in the persisted log. - // This is a side effect of trimming the persisted log to the sequence number captured at the time - // the snapshot was initiated. - assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2, - payload3, payload4), leaderActor.underlyingActor().getState()); + assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), + leaderActor.underlyingActor().getState()); } @Test - public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() { + public void testFollowerRecoveryAfterInstallSnapshot() throws Exception { send2InitialPayloads(); - // Block these messages initially so we can control the sequence. - follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + leader = leaderActor.underlyingActor().getCurrentBehavior(); - MockPayload payload2 = sendPayloadData(leaderActor, "two"); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); - // This should trigger a snapshot. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender()); - // Send another payload. - MockPayload payload4 = sendPayloadData(leaderActor, "four"); + final MockPayload payload2 = sendPayloadData(leaderActor, "two"); - MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3); + // Verify the leader applies the 3rd payload state. + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1); - // Wait for snapshot complete. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1); - // Now deliver the AppendEntries to the follower - follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class); + assertEquals("Leader commit index", 2, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 2, leaderContext.getLastApplied()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex()); - MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3); + killActor(follower2Actor); - reinstateLeaderActor(); + InMemoryJournal.clear(); - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 4, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 4, leaderContext.getLastApplied()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + TestRaftActor follower2Underlying = follower2Actor.underlyingActor(); + follower2CollectorActor = follower2Underlying.collectorActor(); + follower2Context = follower2Underlying.getRaftActorContext(); - assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4), - leaderActor.underlyingActor().getState()); + leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender()); + + // The leader should install a snapshot so wait for the follower to receive ApplySnapshot. + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); + + // Wait for the follower to persist the snapshot. + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class); + + final List expFollowerState = Arrays.asList(payload0, payload1, payload2); + + assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); + assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); + assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); + + killActor(follower2Actor); + + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)), + newFollowerConfigParams()); + + follower2Underlying = follower2Actor.underlyingActor(); + follower2Underlying.waitForRecoveryComplete(); + follower2Context = follower2Underlying.getRaftActorContext(); + + assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); + assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); + assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); } private void reinstateLeaderActor() { @@ -208,6 +219,9 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { currentTerm = leaderContext.getTermInformation().getCurrentTerm(); payload0 = sendPayloadData(leaderActor, "zero"); + + MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1); + payload1 = sendPayloadData(leaderActor, "one"); // Verify the leader applies the states.