X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=d9a5487e556171813bf47d49af2a18a85b0b165c;hp=04ae7ffe8ce77f4e6634f1a672013d79b9d01764;hb=refs%2Fchanges%2F09%2F83009%2F6;hpb=e9fc7e7ed2b13d274518d6a872ab67749ef4507a diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 04ae7ffe8c..d9a5487e55 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; @@ -23,13 +22,13 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; +import akka.protobuf.ByteString; import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; -import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -37,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.SerializationUtils; @@ -104,7 +104,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleMessageForUnknownMessage() throws Exception { + public void testHandleMessageForUnknownMessage() { logStart("testHandleMessageForUnknownMessage"); leader = new Leader(createActorContext()); @@ -114,7 +114,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception { + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -163,18 +163,20 @@ public class LeaderTest extends AbstractLeaderTest { return sendReplicate(actorContext, 1, index); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, + final long index) { return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, + final Payload payload) { SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test - public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { + public void testHandleReplicateMessageSendAppendEntriesToFollower() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -211,7 +213,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -266,7 +268,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -304,7 +306,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -344,7 +346,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() { logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -375,26 +377,47 @@ public class LeaderTest extends AbstractLeaderTest { sendReplicate(actorContext, lastIndex + i + 1); leader.handleMessage(followerActor, new AppendEntriesReply( FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); - } - for (int i = 3; i < 5; i++) { - sendReplicate(actorContext, lastIndex + i + 1); + // We are expecting six messages here -- a request to replicate and a consensus-reached message + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request/consensus appends collected", 6, allMessages.size()); + for (int i = 0; i < 3; i++) { + assertRequestEntry(lastIndex, allMessages, i); + assertCommitEntry(lastIndex, allMessages, i); } - List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); - // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would - // get sent to the follower - but not the 5th - assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + // Now perform another commit, eliciting a request to persist + sendReplicate(actorContext, lastIndex + 3 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // This elicits another message for request to replicate + assertEquals("The number of request entries collected", 7, allMessages.size()); + assertRequestEntry(lastIndex, allMessages, 3); - for (int i = 0; i < 4; i++) { - long expected = allMessages.get(i).getEntries().get(0).getIndex(); - assertEquals(expected, i + 2); - } + sendReplicate(actorContext, lastIndex + 4 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request entries collected", 7, allMessages.size()); + } + + private static void assertCommitEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries commitReq = allMessages.get(2 * messageNr + 1); + assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit()); + assertEquals(ImmutableList.of(), commitReq.getEntries()); + } + + private static void assertRequestEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries req = allMessages.get(2 * messageNr); + assertEquals(lastIndex + messageNr, req.getLeaderCommit()); + + final List entries = req.getEntries(); + assertEquals(1, entries.size()); + assertEquals(messageNr + 2, entries.get(0).getIndex()); } @Test - public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() { logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -439,7 +462,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() { logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -476,7 +499,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() { logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -516,7 +539,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { + public void testHandleReplicateMessageWhenThereAreNoFollowers() { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); MockRaftActorContext actorContext = createActorContext(); @@ -624,7 +647,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendAppendEntriesSnapshotScenario() throws Exception { + public void testSendAppendEntriesSnapshotScenario() { logStart("testSendAppendEntriesSnapshotScenario"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -673,7 +696,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testInitiateInstallSnapshot() throws Exception { + public void testInitiateInstallSnapshot() { logStart("testInitiateInstallSnapshot"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -747,7 +770,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); - AtomicReference> installSnapshotStream = new AtomicReference<>(); + AtomicReference> installSnapshotStream = new AtomicReference<>(); actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); leader = new Leader(actorContext); @@ -775,7 +798,8 @@ public class LeaderTest extends AbstractLeaderTest { // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets // installed with a SendInstallSnapshot - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -795,7 +819,8 @@ public class LeaderTest extends AbstractLeaderTest { assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. @@ -807,13 +832,14 @@ public class LeaderTest extends AbstractLeaderTest { // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. MessageCollectorActor.clearMessages(followerActor); - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @Test - public void testInstallSnapshot() throws Exception { + public void testInstallSnapshot() { logStart("testInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -868,7 +894,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testForceInstallSnapshot() throws Exception { + public void testForceInstallSnapshot() { logStart("testForceInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -983,7 +1009,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendSnapshotfromInstallSnapshotReply() throws Exception { + public void testSendSnapshotfromInstallSnapshotReply() { logStart("testSendSnapshotfromInstallSnapshotReply"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1061,7 +1087,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() { logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1126,7 +1152,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() { logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1173,7 +1199,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, - installSnapshot.getLastChunkHashCode().get().intValue()); + installSnapshot.getLastChunkHashCode().getAsInt()); final int hashCode = Arrays.hashCode(installSnapshot.getData()); @@ -1186,7 +1212,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt()); } @Test @@ -1271,7 +1297,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanLastIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); final MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1326,7 +1352,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); final MockRaftActorContext leaderActorContext = createActorContext(); @@ -1696,7 +1722,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplySuccess() throws Exception { + public void testHandleAppendEntriesReplySuccess() { logStart("testHandleAppendEntriesReplySuccess"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1940,7 +1966,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + public void testIsolatedLeaderCheckTwoFollowers() { logStart("testIsolatedLeaderCheckTwoFollowers"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); @@ -1950,7 +1976,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() { logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); @@ -1960,7 +1986,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLaggingFollowerStarvation() throws Exception { + public void testLaggingFollowerStarvation() { logStart("testLaggingFollowerStarvation"); String leaderActorId = actorFactory.generateActorId("leader"); @@ -2112,7 +2138,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2143,7 +2169,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2175,7 +2201,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2376,6 +2402,59 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); } + @Test + public void testLeaderAddressInAppendEntries() { + logStart("testLeaderAddressInAppendEntries"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + FiniteDuration.create(50, TimeUnit.MILLISECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat shouldn't have the leader address + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower needs the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true, + RaftVersions.CURRENT_VERSION)); + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertTrue(appendEntries.getLeaderAddress().isPresent()); + assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower does not need the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false, + RaftVersions.CURRENT_VERSION)); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, final ActorRef actorRef, final RaftRPC rpc) {