X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicationAndSnapshotsIntegrationTest.java;h=386108f61529258a13fb91c5f83eac8c41c4d8f3;hp=7a291f364b1ce9e6646d9460c665b4610e3a5f79;hb=e1eca73a5ae2ffae8dd78c6fe5281cd2f45d5ef3;hpb=1e884647502a8d91f8a57bde8193c60b9bbcce0d diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java index 7a291f364b..386108f615 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java @@ -8,18 +8,18 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; + import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; import java.util.List; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; -import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -36,13 +36,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt private MockPayload recoveredPayload0; private MockPayload recoveredPayload1; private MockPayload recoveredPayload2; + private MockPayload payload3; private MockPayload payload4; private MockPayload payload5; private MockPayload payload6; private MockPayload payload7; @Test - public void runTest() { + public void runTest() throws Exception { testLog.info("testReplicationAndSnapshots starting"); // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less @@ -55,20 +56,24 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1)); recoveredPayload2 = new MockPayload("two"); InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2)); - InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2)); + InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2)); origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class); // Create the leader and 2 follower actors and verify initial syncing of the followers after leader // persistence recovery. - follower1Actor = newTestRaftActor(follower1Id, null, newFollowerConfigParams()); + DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams(); + followerConfigParams.setSnapshotBatchCount(snapshotBatchCount); + follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower2Id, testActorPath(follower2Id)), followerConfigParams); - follower2Actor = newTestRaftActor(follower2Id, null, newFollowerConfigParams()); + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), followerConfigParams); - peerAddresses = ImmutableMap.builder(). - put(follower1Id, follower1Actor.path().toString()). - put(follower2Id, follower2Actor.path().toString()).build(); + peerAddresses = ImmutableMap.builder() + .put(follower1Id, follower1Actor.path().toString()) + .put(follower2Id, follower2Actor.path().toString()).build(); leaderConfigParams = newLeaderConfigParams(); leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); @@ -156,18 +161,19 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt * scenario, the follower consensus and application of state is delayed until after the snapshot * completes. */ - private void testFirstSnapshot() { + private void testFirstSnapshot() throws Exception { testLog.info("testFirstSnapshot starting"); - byte[] snapshot = new byte[] {1,2,3,4}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(recoveredPayload0); + expSnapshotState.add(recoveredPayload1); + expSnapshotState.add(recoveredPayload2); // Delay the consensus by temporarily dropping the AppendEntries to both followers. follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the payload. - MockPayload payload3 = sendPayloadData(leaderActor, "three"); + payload3 = sendPayloadData(leaderActor, "three"); // Wait for snapshot complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -183,13 +189,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3); // The leader's persisted journal log should be cleared since we snapshotted. - List persistedLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class); + List persistedLeaderJournal = + InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class); assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size()); // Allow AppendEntries to both followers to proceed. This should catch up the followers and cause a @@ -222,6 +229,18 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader last applied", 3, leaderContext.getLastApplied()); assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex()); + // The followers should also snapshot so verify. + + MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SaveSnapshotSuccess.class); + persistedSnapshots = InMemorySnapshotStore.getSnapshots(follower1Id, Snapshot.class); + assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); + // The last applied index in the snapshot may or may not be the last log entry depending on + // timing so to avoid intermittent test failures, we'll just verify the snapshot's last term/index. + assertEquals("Follower1 Snapshot getLastTerm", currentTerm, persistedSnapshots.get(0).getLastTerm()); + assertEquals("Follower1 Snapshot getLastIndex", 3, persistedSnapshots.get(0).getLastIndex()); + + MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class); + MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); @@ -285,21 +304,23 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until * consensus occurs and the leader applies the state. */ - private void testSecondSnapshot() { + private void testSecondSnapshot() throws Exception { testLog.info("testSecondSnapshot starting"); - byte[] snapshot = new byte[] {5,6,7,8}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); // Delay the CaptureSnapshot message to the leader actor. - leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class); + leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class); // Send the payload. payload7 = sendPayloadData(leaderActor, "seven"); - // Capture the CaptureSnapshot message so we can send it later. - CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching( - leaderCollectorActor, CaptureSnapshot.class); + // Capture the CaptureSnapshotReply message so we can send it later. + final CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching( + leaderCollectorActor, CaptureSnapshotReply.class); // Wait for the state to be applied in the leader. ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); @@ -319,12 +340,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader last applied", 7, leaderContext.getLastApplied()); assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex()); - // Now deliver the CaptureSnapshot. - leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class); - leaderActor.tell(captureSnapshot, leaderActor); - - // Wait for CaptureSnapshotReply to complete. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class); + // Now deliver the CaptureSnapshotReply. + leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class); + leaderActor.tell(captureSnapshotReply, leaderActor); // Wait for snapshot complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -339,11 +357,12 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 7, leaderContext.getCommitIndex()); - // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied - // log entry (6). + // Verify the persisted snapshot. This should reflect the snapshot index as the last applied + // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data + // when the snapshot is created (ie when the CaptureSnapshot is processed). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7); @@ -354,7 +373,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt assertEquals("Persisted journal log size", 0, persistedLeaderJournal.size()); // Verify the followers apply all 4 new log entries. - List applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 4); + List applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, + ApplyState.class, 4); verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4); verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5); verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6); @@ -371,7 +391,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class); - RaftActorContext follower1Context = follower1Actor.underlyingActor().getRaftActorContext(); + follower1Context = follower1Actor.underlyingActor().getRaftActorContext(); assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm()); assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex()); assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size()); @@ -380,13 +400,15 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt MessageCollectorActor.clearMessages(follower2CollectorActor); MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class); - RaftActorContext follower2Context = follower2Actor.underlyingActor().getRaftActorContext(); + follower2Context = follower2Actor.underlyingActor().getRaftActorContext(); assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm()); assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex()); assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size()); assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex()); assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex()); + expSnapshotState.add(payload7); + testLog.info("testSecondSnapshot ending"); } @@ -402,6 +424,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt leaderActor.underlyingActor().waitForRecoveryComplete(); + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex()); assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());