*/
package org.opendaylight.controller.cluster.raft;
-import static akka.pattern.Patterns.ask;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+
import akka.actor.ActorRef;
-import akka.actor.Props;
+import akka.actor.Status;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
+import akka.testkit.javadsl.TestKit;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import scala.concurrent.Await;
import scala.concurrent.Future;
public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
private final String follower3Id = factory.generateActorId("follower");
- private TestActorRef<MessageCollectorActor> leaderNotifierActor;
- private TestActorRef<MessageCollectorActor> follower1NotifierActor;
- private TestActorRef<MessageCollectorActor> follower2NotifierActor;
- private TestActorRef<MessageCollectorActor> follower3NotifierActor;
+ private ActorRef leaderNotifierActor;
+ private ActorRef follower1NotifierActor;
+ private ActorRef follower2NotifierActor;
+ private ActorRef follower3NotifierActor;
private TestActorRef<TestRaftActor> follower3Actor;
private ActorRef follower3CollectorActor;
+ private ActorRef requestLeadershipResultCollectorActor;
@Test
- public void testLeaderTransferOnShutDown() throws Throwable {
+ public void testLeaderTransferOnShutDown() throws Exception {
testLog.info("testLeaderTransferOnShutDown starting");
createRaftActors();
testLog.info("testLeaderTransferOnShutDown ending");
}
- private void sendShutDown(ActorRef actor) throws Exception {
+ private void sendShutDown(final ActorRef actor) throws Exception {
testLog.info("sendShutDown for {} starting", actor.path());
FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
- Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown());
+ Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
Boolean stopped = Await.result(stopFuture, duration);
assertEquals("Stopped", Boolean.TRUE, stopped);
testLog.info("sendShutDown for {} ending", actor.path());
}
- private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
+ private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
clearMessages(leaderNotifierActor);
clearMessages(follower2NotifierActor);
clearMessages(follower3NotifierActor);
- FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
- Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+ // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
+ final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
+ follower2Instance.startDropMessages(LeaderTransitioning.class);
- assertNullLeaderIdChange(leaderNotifierActor);
- assertNullLeaderIdChange(follower1NotifierActor);
- assertNullLeaderIdChange(follower2NotifierActor);
- assertNullLeaderIdChange(follower3NotifierActor);
+ FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
verifyRaftState(follower1Actor, RaftState.Leader);
Boolean stopped = Await.result(stopFuture, duration);
assertEquals("Stopped", Boolean.TRUE, stopped);
- follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+ // Re-enable LeaderTransitioning messages to follower2.
+ final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
+ LeaderTransitioning.class);
+ follower2Instance.stopDropMessages(LeaderTransitioning.class);
+
+ follower2Instance.stopDropMessages(AppendEntries.class);
ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
+ // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
+ follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
+
+ verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
+ verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
+ // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
+ // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
+ verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
+ verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
+
testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
}
private void createRaftActors() {
testLog.info("createRaftActors starting");
- follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1,
+ 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
+ List.of(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
+ new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
+
+ InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
+ InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
+
+ follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
factory.generateActorId(follower1Id + "-notifier"));
follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
- follower3Id, testActorPath(follower3Id))).
- config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
+ Map.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
+ follower3Id, testActorPath(follower3Id)))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
- follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ follower2NotifierActor = factory.createActor(MessageCollectorActor.props(),
factory.generateActorId(follower2Id + "-notifier"));
follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
- follower3Id, testActorPath(follower3Id))).
- config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
+ Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
+ follower3Id, testActorPath(follower3Id)))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
- follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ follower3NotifierActor = factory.createActor(MessageCollectorActor.props(),
factory.generateActorId(follower3Id + "-notifier"));
follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
- ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
- follower2Id, follower2Actor.path().toString())).
- config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
+ Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
+ follower2Id, follower2Actor.path().toString()))
+ .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
- peerAddresses = ImmutableMap.<String, String>builder().
- put(follower1Id, follower1Actor.path().toString()).
- put(follower2Id, follower2Actor.path().toString()).
- put(follower3Id, follower3Actor.path().toString()).build();
+ peerAddresses = Map.of(
+ follower1Id, follower1Actor.path().toString(),
+ follower2Id, follower2Actor.path().toString(),
+ follower3Id, follower3Actor.path().toString());
leaderConfigParams = newLeaderConfigParams();
leaderConfigParams.setElectionTimeoutFactor(3);
- leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
factory.generateActorId(leaderId + "-notifier"));
- leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
- config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
+ .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
leaderContext = leaderActor.underlyingActor().getRaftActorContext();
- leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
waitUntilLeader(leaderActor);
testLog.info("createRaftActors starting");
}
- private static void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
- Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
- Throwable lastError = null;
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- try {
- OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
- GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
- assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
- return;
- } catch (Exception | AssertionError e) {
- lastError = e;
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
- }
-
- throw lastError;
+ private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
+ verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
}
- private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
- LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertNull("Expected null leader Id", change.getLeaderId());
+ private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor,
+ final String... expLeaderIds) {
+ List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
+ expLeaderIds.length);
+
+ Collections.reverse(leaderStateChanges);
+ Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
+ for (int i = expLeaderIds.length - 1; i >= 0; i--) {
+ assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
+ }
}
@Test
- public void testLeaderTransferAborted() throws Throwable {
+ public void testLeaderTransferAborted() throws Exception {
testLog.info("testLeaderTransferAborted starting");
createRaftActors();
- follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
- follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+ leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
sendShutDown(leaderActor);
}
@Test
- public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
+ public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
}
+
+ private void sendFollower2RequestLeadershipTransferToLeader() {
+ testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
+
+ leaderActor.tell(
+ new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
+
+ testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
+ }
+
+ private void createRequestLeadershipResultCollectorActor() {
+ testLog.info("createRequestLeadershipResultCollectorActor starting");
+
+ requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
+
+ testLog.info("createRequestLeadershipResultCollectorActor ending");
+ }
+
+ @Test
+ public void testSuccessfulRequestLeadershipTransferToFollower2() {
+ testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower2Actor, RaftState.Leader);
+
+ expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
+
+ testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
+ }
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendPayloadWithFollower2Lagging();
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower1Actor, RaftState.Follower);
+ verifyRaftState(follower2Actor, RaftState.Follower);
+ verifyRaftState(follower3Actor, RaftState.Follower);
+
+ Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+ assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
+ }
+
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ sendShutDown(follower2Actor);
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ verifyRaftState(follower1Actor, RaftState.Follower);
+ verifyRaftState(follower3Actor, RaftState.Follower);
+
+ Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
+ assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
+ }
+
+ @Test
+ public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() {
+ testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
+
+ createRaftActors();
+ createRequestLeadershipResultCollectorActor();
+
+ factory.killActor(follower1Actor, new TestKit(getSystem()));
+ factory.killActor(follower3Actor, new TestKit(getSystem()));
+
+ sendFollower2RequestLeadershipTransferToLeader();
+
+ expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
+
+ verifyRaftState(follower2Actor, RaftState.Leader);
+ verifyRaftState(leaderActor, RaftState.Follower);
+
+ testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");
+ }
}