From: Tom Pantelis Date: Wed, 16 Dec 2015 17:37:42 +0000 (-0500) Subject: Add ShutDown message to RaftActor to transfer leader X-Git-Tag: release/beryllium~63 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4e000b89c3b5ac555cb1e2c39e999a8633b48a96 Add ShutDown message to RaftActor to transfer leader Added a ShutDown message to gracefully stop a RaftActor. If the leader, it attempts leadership transfer as follows: 1) Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier. This will cause the ShardManager to clear it's cached leader state and primaryInfoCache. 2) Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to their local RoleChangeNotifiers. 3) Call a protected method, pauseLeader, passing the RaftActorLeadershipTransferCohort. This allows derived classes to perform work prior to transferring leadership. The Shard will wait for current transactions to complete. 4) After pause is complete, the run method on the RaftActorLeadershipTransferCohort is called which in turn calls transferLeadership on the Leader. 5) When transfer is complete, send a PoisonPill to self. If not the leader or has no followers, it just calls pauseLeader an sends a PoisonPill on completion. Change-Id: I27fa8a95f260b75516b7e558caea4a1a3255dda3 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 3afaad8572..6851f6aab9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.PoisonPill; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +125,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; + private RaftActorLeadershipTransferCohort leadershipTransferInProgress; + + private boolean shuttingDown; + public RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { @@ -253,11 +259,74 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { switchBehavior(((SwitchBehavior) message)); } else if(message instanceof LeaderTransitioning) { onLeaderTransitioning(); + } else if(message instanceof Shutdown) { + onShutDown(); + } else if(message instanceof Runnable) { + ((Runnable)message).run(); } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) { switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); } } + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + LOG.debug("{}: Initiating leader transfer", persistenceId()); + + if(leadershipTransferInProgress == null) { + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender()); + leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + leadershipTransferInProgress = null; + } + + @Override + public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + leadershipTransferInProgress = null; + } + }); + + leadershipTransferInProgress.addOnComplete(onComplete); + leadershipTransferInProgress.init(); + } else { + LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); + leadershipTransferInProgress.addOnComplete(onComplete); + } + } + + private void onShutDown() { + LOG.debug("{}: onShutDown", persistenceId()); + + if(shuttingDown) { + return; + } + + shuttingDown = true; + if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) { + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); + raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); + } + + @Override + public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); + raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); + } + }); + } else if(currentBehavior.state() == RaftState.Leader) { + pauseLeader(new Runnable() { + @Override + public void run() { + self().tell(PoisonPill.getInstance(), self()); + } + }); + } else { + self().tell(PoisonPill.getInstance(), self()); + } + } + private void onLeaderTransitioning() { LOG.debug("{}: onLeaderTransitioning", persistenceId()); Optional roleChangeNotifier = getRoleChangeNotifier(); @@ -361,6 +430,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + + if(leadershipTransferInProgress != null) { + leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); + } } if (roleChangeNotifier.isPresent() && @@ -458,6 +531,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return context.getId().equals(currentBehavior.getLeaderId()); } + protected boolean isLeaderActive() { + return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null; + } + /** * Derived actor can call getLeader if they need a reference to the Leader. * This would be useful for example in forwarding a request to an actor @@ -638,6 +715,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract Optional getRoleChangeNotifier(); + /** + * This method is called prior to operations such as leadership transfer and actor shutdown when the leader + * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current + * work prior to performing the operation. On completion of any work, the run method must be called to + * proceed with the given operation. + *

+ * The default implementation immediately runs the operation. + * + * @param operation the operation to run + */ + protected void pauseLeader(Runnable operation) { + operation.run(); + } + protected void onLeaderChanged(String oldLeader, String newLeader){}; private String getLeaderAddress(){ diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 77678e9f24..623fa4902c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -7,43 +7,174 @@ */ package org.opendaylight.controller.cluster.raft; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** - * A helper class that participates in raft actor leadership transfer. An instance is created upon + * A raft actor support class that participates in leadership transfer. An instance is created upon * initialization of leadership transfer. *

- * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state. + * The transfer process is as follows: + *

    + *
  1. Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify + * clients that we no longer have a working leader.
  2. + *
  3. Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to + * their local RoleChangeNotifiers.
  4. + *
  5. Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort + * instance. This allows derived classes to perform work prior to transferring leadership.
  6. + *
  7. When the pause is complete, the {@link #run} method is called which in turn calls + * {@link Leader#transferLeadership}.
  8. + *
  9. The Leader calls {@link #transferComplete} on successful completion.
  10. + *
  11. Wait a short period of time for the new leader to be elected to give the derived class a chance to + * possibly complete work that was suspended while we were transferring.
  12. + *
  13. On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.
  14. + *
+ *

+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify + * internal state. * * @author Thomas Pantelis */ -public abstract class RaftActorLeadershipTransferCohort { +public class RaftActorLeadershipTransferCohort implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); + private final RaftActor raftActor; + private final ActorRef replyTo; + private Cancellable newLeaderTimer; + private final List onCompleteCallbacks = new ArrayList<>(); + private long newLeaderTimeoutInMillis = 2000; + private final Stopwatch transferTimer = Stopwatch.createUnstarted(); - protected RaftActorLeadershipTransferCohort(RaftActor raftActor) { + RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) { this.raftActor = raftActor; + this.replyTo = replyTo; + } + + void init() { + RaftActorContext context = raftActor.getRaftActorContext(); + RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior(); + + transferTimer.start(); + + Optional roleChangeNotifier = raftActor.getRoleChangeNotifier(); + if(roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null, + currentBehavior.getLeaderPayloadVersion()), raftActor.self()); + } + + LeaderTransitioning leaderTransitioning = new LeaderTransitioning(); + for(String peerId: context.getPeerIds()) { + ActorSelection followerActor = context.getPeerActorSelection(peerId); + if(followerActor != null) { + followerActor.tell(leaderTransitioning, context.getActor()); + } + } + + raftActor.pauseLeader(this); } /** - * This method is invoked to start leadership transfer. + * This method is invoked to run the leadership transfer. */ - public void startTransfer() { + @Override + public void run() { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); + // Sanity check... if(behavior instanceof Leader) { ((Leader)behavior).transferLeadership(this); + } else { + LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId()); + finish(true); } } /** - * This method is invoked to abort leadership transfer. + * This method is invoked to abort leadership transfer on failure. */ public void abortTransfer() { - transferComplete(); + LOG.debug("{}: leader transfer aborted", raftActor.persistenceId()); + finish(false); } /** - * This method is invoked when leadership transfer is complete. + * This method is invoked when leadership transfer was carried out and complete. */ - public abstract void transferComplete(); + public void transferComplete() { + LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId()); + + // We'll give it a little time for the new leader to be elected to give the derived class a + // chance to possibly complete work that was suspended while we were transferring. The + // RequestVote message from the new leader candidate should cause us to step down as leader + // and convert to follower due to higher term. We should then get an AppendEntries heart + // beat with the new leader id. + + // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new + // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it + // safely run on actor's thread dispatcher. + FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); + newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), + new Runnable() { + @Override + public void run() { + LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); + finish(true); + } + }, raftActor.getContext().system().dispatcher(), raftActor.self()); + } + + void onNewLeader(String newLeader) { + if(newLeader != null && newLeaderTimer != null) { + LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader); + newLeaderTimer.cancel(); + finish(true); + } + } + + private void finish(boolean success) { + if(transferTimer.isRunning()) { + transferTimer.stop(); + if(success) { + LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(), + raftActor.getLeaderId(), transferTimer.toString()); + } else { + LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), + transferTimer.toString()); + } + } + + for(OnComplete onComplete: onCompleteCallbacks) { + if(success) { + onComplete.onSuccess(raftActor.self(), replyTo); + } else { + onComplete.onFailure(raftActor.self(), replyTo); + } + } + } + + void addOnComplete(OnComplete onComplete) { + onCompleteCallbacks.add(onComplete); + } + + @VisibleForTesting + void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { + this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; + } + + interface OnComplete { + void onSuccess(ActorRef raftActorRef, ActorRef replyTo); + void onFailure(ActorRef raftActorRef, ActorRef replyTo); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 6bbb70ce6b..21aad966cb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -91,12 +91,25 @@ public class Leader extends AbstractLeader { return returnBehavior; } + /** + * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows: + *

    + *
  • Start a timer (Stopwatch).
  • + *
  • Send an initial AppendEntries heartbeat to all followers.
  • + *
  • On AppendEntriesReply, check if the follower's new match Index matches the leader's last index
  • + *
  • If it matches,
  • + *
      + *
    • Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.
    • + *
    • Send an ElectionTimeout to the follower to immediately start an election.
    • + *
    • Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.
    • + *
    + *
  • Otherwise if the election time out period elapses, notify + * {@link RaftActorLeadershipTransferCohort#abortTtransfer}.
  • + *
+ * + * @param leadershipTransferCohort + */ public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) { - if(!context.hasFollowers()) { - leadershipTransferCohort.transferComplete(); - return; - } - LOG.debug("{}: Attempting to transfer leadership", logName()); leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort); @@ -124,7 +137,7 @@ public class Leader extends AbstractLeader { LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); // We can't be sure if the follower has applied all its log entries to its state so send an - // additional AppendEntries. + // additional AppendEntries with the latest commit index. sendAppendEntries(0, false); // Now send an ElectionTimeout to the matching follower to immediately start an election. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java new file mode 100644 index 0000000000..930a4b0471 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/Shutdown.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import java.io.Serializable; + +/** + * Message sent to a raft actor to shutdown gracefully. If it's the leader it will transfer leadership to a + * follower. As its last act, the actor self-destructs via a PoisonPill. + * + * @author Thomas Pantelis + */ +public class Shutdown implements Serializable { + private static final long serialVersionUID = 1L; +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 30ead98cb4..bf55fa7aca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.InvalidActorNameException; import akka.actor.PoisonPill; -import akka.actor.Props; import akka.actor.Terminated; import akka.dispatch.Dispatchers; import akka.testkit.JavaTestKit; @@ -72,16 +71,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest private final TestActorRef collectorActor; private final Map, Boolean> dropMessages = new ConcurrentHashMap<>(); - private TestRaftActor(String id, Map peerAddresses, ConfigParams config, - TestActorRef collectorActor) { - super(builder().id(id).peerAddresses(peerAddresses).config(config)); - this.collectorActor = collectorActor; - } - - public static Props props(String id, Map peerAddresses, ConfigParams config, - TestActorRef collectorActor) { - return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor). - withDispatcher(Dispatchers.DefaultDispatcherId()); + private TestRaftActor(Builder builder) { + super(builder); + this.collectorActor = builder.collectorActor; } void startDropMessages(Class msgClass) { @@ -147,6 +139,23 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest public ActorRef collectorActor() { return collectorActor; } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends AbstractBuilder { + private TestActorRef collectorActor; + + public Builder collectorActor(TestActorRef collectorActor) { + this.collectorActor = collectorActor; + return this; + } + + private Builder() { + super(TestRaftActor.class); + } + } } protected final Logger testLog = LoggerFactory.getLogger(getClass()); @@ -211,16 +220,19 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected TestActorRef newTestRaftActor(String id, Map peerAddresses, ConfigParams configParams) { - TestActorRef collectorActor = factory.createTestActor( + return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses : + Collections.emptyMap()).config(configParams)); + } + + protected TestActorRef newTestRaftActor(String id, TestRaftActor.Builder builder) { + builder.collectorActor(factory.createTestActor( MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), - factory.generateActorId(id + "-collector")); + factory.generateActorId(id + "-collector"))).id(id); InvalidActorNameException lastEx = null; for(int i = 0; i < 10; i++) { try { - return factory.createTestActor(TestRaftActor.props(id, - peerAddresses != null ? peerAddresses : Collections.emptyMap(), - configParams, collectorActor), id); + return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id); } catch (InvalidActorNameException e) { lastEx = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); @@ -309,7 +321,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } protected String testActorPath(String id){ - return "akka://test/user" + id; + return factory.createTestActorPath(id); } protected void verifyLeadersTrimmedLog(long lastIndex) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java new file mode 100644 index 0000000000..3cc330a96a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static akka.pattern.Patterns.ask; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Tests leadership transfer end-to-end. + * + * @author Thomas Pantelis + */ +public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest { + + private TestActorRef leaderNotifierActor; + private TestActorRef follower1NotifierActor; + private TestActorRef follower2NotifierActor; + + @Test + public void testLeaderTransferOnShutDown() throws Throwable { + testLog.info("testLeaderTransferOnShutDown starting"); + + createRaftActors(); + + sendPayloadWithFollower2Lagging(); + + sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1(); + + sendShutDown(follower2Actor); + + testLog.info("testLeaderTransferOnShutDown ending"); + } + + private void sendShutDown(ActorRef actor) throws Exception { + testLog.info("sendShutDown for {} starting", actor.path()); + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown()); + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + testLog.info("sendShutDown for {} ending", actor.path()); + } + + private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable { + testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting"); + + clearMessages(leaderNotifierActor); + clearMessages(follower1NotifierActor); + clearMessages(follower2NotifierActor); + + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown()); + + assertNullLeaderIdChange(leaderNotifierActor); + assertNullLeaderIdChange(follower1NotifierActor); + assertNullLeaderIdChange(follower2NotifierActor); + + verifyRaftState(follower1Actor, RaftState.Leader); + + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); + + follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); + ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class); + assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex()); + + testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending"); + } + + private void sendPayloadWithFollower2Lagging() { + testLog.info("sendPayloadWithFollower2Lagging starting"); + + follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + + sendPayloadData(leaderActor, "zero"); + + expectFirstMatching(leaderCollectorActor, ApplyState.class); + expectFirstMatching(follower1CollectorActor, ApplyState.class); + + testLog.info("sendPayloadWithFollower2Lagging ending"); + } + + private void createRaftActors() { + testLog.info("createRaftActors starting"); + + follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + factory.generateActorId(follower1Id + "-notifier")); + follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses( + ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))). + config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor)); + + follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + factory.generateActorId(follower2Id + "-notifier")); + follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses( + ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())). + config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor)); + + peerAddresses = ImmutableMap.builder(). + put(follower1Id, follower1Actor.path().toString()). + put(follower2Id, follower2Actor.path().toString()).build(); + + leaderConfigParams = newLeaderConfigParams(); + leaderConfigParams.setElectionTimeoutFactor(3); + leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class), + factory.generateActorId(leaderId + "-notifier")); + leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses). + config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor)); + + follower1CollectorActor = follower1Actor.underlyingActor().collectorActor(); + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); + leaderCollectorActor = leaderActor.underlyingActor().collectorActor(); + + leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + + waitUntilLeader(leaderActor); + + testLog.info("createRaftActors starting"); + } + + private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable { + Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); + Throwable lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + try { + OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor, + GetOnDemandRaftState.INSTANCE, timeout), timeout.duration()); + assertEquals("getRaftState", expState.toString(), raftState.getRaftState()); + return; + } catch (Exception | AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + + private void assertNullLeaderIdChange(TestActorRef notifierActor) { + LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertNull("Expected null leader Id", change.getLeaderId()); + } + + @Test + public void testLeaderTransferAborted() throws Throwable { + testLog.info("testLeaderTransferAborted starting"); + + createRaftActors(); + + follower1Actor.underlyingActor().startDropMessages(AppendEntries.class); + follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + + sendShutDown(leaderActor); + + testLog.info("testLeaderTransferOnShutDown ending"); + } + + @Test + public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable { + testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting"); + + leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams())); + waitUntilLeader(leaderActor); + + sendShutDown(leaderActor); + + testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending"); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 38650e834f..550504b400 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -44,7 +44,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private final byte[] restoreFromSnapshot; final CountDownLatch snapshotCommitted = new CountDownLatch(1); - protected MockRaftActor(Builder builder) { + protected MockRaftActor(AbstractBuilder builder) { super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); @@ -259,7 +259,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return new Builder(); } - public static class Builder { + public static class AbstractBuilder, A extends MockRaftActor> { private Map peerAddresses = Collections.emptyMap(); private String id; private ConfigParams config; @@ -268,49 +268,65 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private RaftActorSnapshotMessageSupport snapshotMessageSupport; private byte[] restoreFromSnapshot; private Optional persistent = Optional.absent(); + private final Class actorClass; - public Builder id(String id) { + protected AbstractBuilder(Class actorClass) { + this.actorClass = actorClass; + } + + @SuppressWarnings("unchecked") + private T self() { + return (T) this; + } + + public T id(String id) { this.id = id; - return this; + return self(); } - public Builder peerAddresses(Map peerAddresses) { + public T peerAddresses(Map peerAddresses) { this.peerAddresses = peerAddresses; - return this; + return self(); } - public Builder config(ConfigParams config) { + public T config(ConfigParams config) { this.config = config; - return this; + return self(); } - public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) { + public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) { this.dataPersistenceProvider = dataPersistenceProvider; - return this; + return self(); } - public Builder roleChangeNotifier(ActorRef roleChangeNotifier) { + public T roleChangeNotifier(ActorRef roleChangeNotifier) { this.roleChangeNotifier = roleChangeNotifier; - return this; + return self(); } - public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) { + public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) { this.snapshotMessageSupport = snapshotMessageSupport; - return this; + return self(); } - public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) { + public T restoreFromSnapshot(byte[] restoreFromSnapshot) { this.restoreFromSnapshot = restoreFromSnapshot; - return this; + return self(); } - public Builder persistent(Optional persistent) { + public T persistent(Optional persistent) { this.persistent = persistent; - return this; + return self(); } public Props props() { - return Props.create(MockRaftActor.class, this); + return Props.create(actorClass, this); + } + } + + public static class Builder extends AbstractBuilder { + private Builder() { + super(MockRaftActor.class); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java new file mode 100644 index 0000000000..5680a0b590 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import akka.dispatch.Dispatchers; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete; + +/** + * Unit tests for RaftActorLeadershipTransferCohort. + * + * @author Thomas Pantelis + */ +public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest { + private final TestActorFactory factory = new TestActorFactory(getSystem()); + private MockRaftActor mockRaftActor; + private RaftActorLeadershipTransferCohort cohort; + private final OnComplete onComplete = mock(OnComplete.class); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + + @After + public void tearDown() { + factory.close(); + } + + private void setup() { + String persistenceId = factory.generateActorId("leader-"); + mockRaftActor = factory.createTestActor(MockRaftActor.builder().id(persistenceId).config( + config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor(); + cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null); + cohort.addOnComplete(onComplete); + mockRaftActor.waitForInitializeBehaviorComplete(); + } + + @Test + public void testOnNewLeader() { + setup(); + cohort.setNewLeaderTimeoutInMillis(20000); + + cohort.onNewLeader("new-leader"); + verify(onComplete, never()).onSuccess(mockRaftActor.self(), null); + + cohort.transferComplete(); + + cohort.onNewLeader(null); + verify(onComplete, never()).onSuccess(mockRaftActor.self(), null); + + cohort.onNewLeader("new-leader"); + verify(onComplete).onSuccess(mockRaftActor.self(), null); + } + + @Test + public void testNewLeaderTimeout() { + setup(); + cohort.setNewLeaderTimeoutInMillis(200); + cohort.transferComplete(); + verify(onComplete, timeout(3000)).onSuccess(mockRaftActor.self(), null); + } + + @Test + public void testNotLeaderOnRun() { + config.setElectionTimeoutFactor(10000); + setup(); + cohort.run(); + verify(onComplete).onSuccess(mockRaftActor.self(), null); + } + + @Test + public void testAbortTransfer() { + setup(); + cohort.abortTransfer(); + verify(onComplete).onFailure(mockRaftActor.self(), null); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c39c62c727..b51f0a70b1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -2094,20 +2094,6 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); } - @Test - public void testTransferLeadershipWithNoFollowers() { - logStart("testTransferLeadershipWithNoFollowers"); - - MockRaftActorContext leaderActorContext = createActorContext(); - - leader = new Leader(leaderActorContext); - - RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - leader.transferLeadership(mockTransferCohort); - - verify(mockTransferCohort).transferComplete(); - } - @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {