import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private RaftActorServerConfigurationSupport serverConfigurationSupport;
+ private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
+
+ private boolean shuttingDown;
+
public RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
switchBehavior(((SwitchBehavior) message));
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
+ } else if(message instanceof Shutdown) {
+ onShutDown();
+ } else if(message instanceof Runnable) {
+ ((Runnable)message).run();
} else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+ if(leadershipTransferInProgress == null) {
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+ leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ leadershipTransferInProgress = null;
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ leadershipTransferInProgress = null;
+ }
+ });
+
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ leadershipTransferInProgress.init();
+ } else {
+ LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ }
+ }
+
+ private void onShutDown() {
+ LOG.debug("{}: onShutDown", persistenceId());
+
+ if(shuttingDown) {
+ return;
+ }
+
+ shuttingDown = true;
+ if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+ });
+ } else if(currentBehavior.state() == RaftState.Leader) {
+ pauseLeader(new Runnable() {
+ @Override
+ public void run() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ });
+ } else {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ }
+
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
}
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+
+ if(leadershipTransferInProgress != null) {
+ leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
+ }
}
if (roleChangeNotifier.isPresent() &&
return context.getId().equals(currentBehavior.getLeaderId());
}
+ protected boolean isLeaderActive() {
+ return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+ }
+
/**
* Derived actor can call getLeader if they need a reference to the Leader.
* This would be useful for example in forwarding a request to an actor
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
+ /**
+ * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+ * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+ * work prior to performing the operation. On completion of any work, the run method must be called to
+ * proceed with the given operation.
+ * <p>
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
+ */
+ protected void pauseLeader(Runnable operation) {
+ operation.run();
+ }
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private String getLeaderAddress(){
*/
package org.opendaylight.controller.cluster.raft;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
- * A helper class that participates in raft actor leadership transfer. An instance is created upon
+ * A raft actor support class that participates in leadership transfer. An instance is created upon
* initialization of leadership transfer.
* <p>
- * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state.
+ * The transfer process is as follows:
+ * <ol>
+ * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
+ * clients that we no longer have a working leader.</li>
+ * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
+ * their local RoleChangeNotifiers.</li>
+ * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
+ * instance. This allows derived classes to perform work prior to transferring leadership.</li>
+ * <li>When the pause is complete, the {@link #run} method is called which in turn calls
+ * {@link Leader#transferLeadership}.</li>
+ * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
+ * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
+ * possibly complete work that was suspended while we were transferring.</li>
+ * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
+ * </ol>
+ * <p>
+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
+ * internal state.
*
* @author Thomas Pantelis
*/
-public abstract class RaftActorLeadershipTransferCohort {
+public class RaftActorLeadershipTransferCohort implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
+
private final RaftActor raftActor;
+ private final ActorRef replyTo;
+ private Cancellable newLeaderTimer;
+ private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
+ private long newLeaderTimeoutInMillis = 2000;
+ private final Stopwatch transferTimer = Stopwatch.createUnstarted();
- protected RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+ RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
this.raftActor = raftActor;
+ this.replyTo = replyTo;
+ }
+
+ void init() {
+ RaftActorContext context = raftActor.getRaftActorContext();
+ RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
+
+ transferTimer.start();
+
+ Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
+ currentBehavior.getLeaderPayloadVersion()), raftActor.self());
+ }
+
+ LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
+ for(String peerId: context.getPeerIds()) {
+ ActorSelection followerActor = context.getPeerActorSelection(peerId);
+ if(followerActor != null) {
+ followerActor.tell(leaderTransitioning, context.getActor());
+ }
+ }
+
+ raftActor.pauseLeader(this);
}
/**
- * This method is invoked to start leadership transfer.
+ * This method is invoked to run the leadership transfer.
*/
- public void startTransfer() {
+ @Override
+ public void run() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
+ // Sanity check...
if(behavior instanceof Leader) {
((Leader)behavior).transferLeadership(this);
+ } else {
+ LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
+ finish(true);
}
}
/**
- * This method is invoked to abort leadership transfer.
+ * This method is invoked to abort leadership transfer on failure.
*/
public void abortTransfer() {
- transferComplete();
+ LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
+ finish(false);
}
/**
- * This method is invoked when leadership transfer is complete.
+ * This method is invoked when leadership transfer was carried out and complete.
*/
- public abstract void transferComplete();
+ public void transferComplete() {
+ LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+
+ // We'll give it a little time for the new leader to be elected to give the derived class a
+ // chance to possibly complete work that was suspended while we were transferring. The
+ // RequestVote message from the new leader candidate should cause us to step down as leader
+ // and convert to follower due to higher term. We should then get an AppendEntries heart
+ // beat with the new leader id.
+
+ // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
+ // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
+ // safely run on actor's thread dispatcher.
+ FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
+ newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
+ new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ }
+
+ void onNewLeader(String newLeader) {
+ if(newLeader != null && newLeaderTimer != null) {
+ LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+ newLeaderTimer.cancel();
+ finish(true);
+ }
+ }
+
+ private void finish(boolean success) {
+ if(transferTimer.isRunning()) {
+ transferTimer.stop();
+ if(success) {
+ LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+ raftActor.getLeaderId(), transferTimer.toString());
+ } else {
+ LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+ transferTimer.toString());
+ }
+ }
+
+ for(OnComplete onComplete: onCompleteCallbacks) {
+ if(success) {
+ onComplete.onSuccess(raftActor.self(), replyTo);
+ } else {
+ onComplete.onFailure(raftActor.self(), replyTo);
+ }
+ }
+ }
+
+ void addOnComplete(OnComplete onComplete) {
+ onCompleteCallbacks.add(onComplete);
+ }
+
+ @VisibleForTesting
+ void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+ this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ }
+
+ interface OnComplete {
+ void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
+ void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+ }
}
return returnBehavior;
}
+ /**
+ * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
+ * <ul>
+ * <li>Start a timer (Stopwatch).</li>
+ * <li>Send an initial AppendEntries heartbeat to all followers.</li>
+ * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
+ * <li>If it matches, </li>
+ * <ul>
+ * <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
+ * <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
+ * <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
+ * </ul>
+ * <li>Otherwise if the election time out period elapses, notify
+ * {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
+ * </ul>
+ *
+ * @param leadershipTransferCohort
+ */
public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
- if(!context.hasFollowers()) {
- leadershipTransferCohort.transferComplete();
- return;
- }
-
LOG.debug("{}: Attempting to transfer leadership", logName());
leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
// We can't be sure if the follower has applied all its log entries to its state so send an
- // additional AppendEntries.
+ // additional AppendEntries with the latest commit index.
sendAppendEntries(0, false);
// Now send an ElectionTimeout to the matching follower to immediately start an election.
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.io.Serializable;
+
+/**
+ * Message sent to a raft actor to shutdown gracefully. If it's the leader it will transfer leadership to a
+ * follower. As its last act, the actor self-destructs via a PoisonPill.
+ *
+ * @author Thomas Pantelis
+ */
+public class Shutdown implements Serializable {
+ private static final long serialVersionUID = 1L;
+}
import akka.actor.ActorRef;
import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
- private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- super(builder().id(id).peerAddresses(peerAddresses).config(config));
- this.collectorActor = collectorActor;
- }
-
- public static Props props(String id, Map<String, String> peerAddresses, ConfigParams config,
- TestActorRef<MessageCollectorActor> collectorActor) {
- return Props.create(TestRaftActor.class, id, peerAddresses, config, collectorActor).
- withDispatcher(Dispatchers.DefaultDispatcherId());
+ private TestRaftActor(Builder builder) {
+ super(builder);
+ this.collectorActor = builder.collectorActor;
}
void startDropMessages(Class<?> msgClass) {
public ActorRef collectorActor() {
return collectorActor;
}
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder extends AbstractBuilder<Builder, TestRaftActor> {
+ private TestActorRef<MessageCollectorActor> collectorActor;
+
+ public Builder collectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+ this.collectorActor = collectorActor;
+ return this;
+ }
+
+ private Builder() {
+ super(TestRaftActor.class);
+ }
+ }
}
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected TestActorRef<TestRaftActor> newTestRaftActor(String id, Map<String, String> peerAddresses,
ConfigParams configParams) {
- TestActorRef<MessageCollectorActor> collectorActor = factory.createTestActor(
+ return newTestRaftActor(id, TestRaftActor.newBuilder().peerAddresses(peerAddresses != null ? peerAddresses :
+ Collections.<String, String>emptyMap()).config(configParams));
+ }
+
+ protected TestActorRef<TestRaftActor> newTestRaftActor(String id, TestRaftActor.Builder builder) {
+ builder.collectorActor(factory.<MessageCollectorActor>createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- factory.generateActorId(id + "-collector"));
+ factory.generateActorId(id + "-collector"))).id(id);
InvalidActorNameException lastEx = null;
for(int i = 0; i < 10; i++) {
try {
- return factory.createTestActor(TestRaftActor.props(id,
- peerAddresses != null ? peerAddresses : Collections.<String, String>emptyMap(),
- configParams, collectorActor), id);
+ return factory.createTestActor(builder.props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
} catch (InvalidActorNameException e) {
lastEx = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
protected String testActorPath(String id){
- return "akka://test/user" + id;
+ return factory.createTestActorPath(id);
}
protected void verifyLeadersTrimmedLog(long lastIndex) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static akka.pattern.Patterns.ask;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests leadership transfer end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
+
+ private TestActorRef<MessageCollectorActor> leaderNotifierActor;
+ private TestActorRef<MessageCollectorActor> follower1NotifierActor;
+ private TestActorRef<MessageCollectorActor> follower2NotifierActor;
+
+ @Test
+ public void testLeaderTransferOnShutDown() throws Throwable {
+ testLog.info("testLeaderTransferOnShutDown starting");
+
+ createRaftActors();
+
+ sendPayloadWithFollower2Lagging();
+
+ sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
+
+ sendShutDown(follower2Actor);
+
+ testLog.info("testLeaderTransferOnShutDown ending");
+ }
+
+ private void sendShutDown(ActorRef actor) throws Exception {
+ testLog.info("sendShutDown for {} starting", actor.path());
+
+ FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+ Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown());
+
+ Boolean stopped = Await.result(stopFuture, duration);
+ assertEquals("Stopped", Boolean.TRUE, stopped);
+
+ testLog.info("sendShutDown for {} ending", actor.path());
+ }
+
+ private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
+ testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
+
+ clearMessages(leaderNotifierActor);
+ clearMessages(follower1NotifierActor);
+ clearMessages(follower2NotifierActor);
+
+ FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+ Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+ assertNullLeaderIdChange(leaderNotifierActor);
+ assertNullLeaderIdChange(follower1NotifierActor);
+ assertNullLeaderIdChange(follower2NotifierActor);
+
+ verifyRaftState(follower1Actor, RaftState.Leader);
+
+ Boolean stopped = Await.result(stopFuture, duration);
+ assertEquals("Stopped", Boolean.TRUE, stopped);
+
+ follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+ ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
+ assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
+
+ testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
+ }
+
+ private void sendPayloadWithFollower2Lagging() {
+ testLog.info("sendPayloadWithFollower2Lagging starting");
+
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ sendPayloadData(leaderActor, "zero");
+
+ expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ expectFirstMatching(follower1CollectorActor, ApplyState.class);
+
+ testLog.info("sendPayloadWithFollower2Lagging ending");
+ }
+
+ private void createRaftActors() {
+ testLog.info("createRaftActors starting");
+
+ follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ factory.generateActorId(follower1Id + "-notifier"));
+ follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+ config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
+
+ follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ factory.generateActorId(follower2Id + "-notifier"));
+ follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
+ ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
+ config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
+
+ peerAddresses = ImmutableMap.<String, String>builder().
+ put(follower1Id, follower1Actor.path().toString()).
+ put(follower2Id, follower2Actor.path().toString()).build();
+
+ leaderConfigParams = newLeaderConfigParams();
+ leaderConfigParams.setElectionTimeoutFactor(3);
+ leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+ factory.generateActorId(leaderId + "-notifier"));
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+ config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+
+ follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+ leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+ leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
+ waitUntilLeader(leaderActor);
+
+ testLog.info("createRaftActors starting");
+ }
+
+ private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
+ Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+ Throwable lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ try {
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+ assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
+ return;
+ } catch (Exception | AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
+
+ private void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
+ LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertNull("Expected null leader Id", change.getLeaderId());
+ }
+
+ @Test
+ public void testLeaderTransferAborted() throws Throwable {
+ testLog.info("testLeaderTransferAborted starting");
+
+ createRaftActors();
+
+ follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+ follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+ sendShutDown(leaderActor);
+
+ testLog.info("testLeaderTransferOnShutDown ending");
+ }
+
+ @Test
+ public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
+ testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
+
+ leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
+ waitUntilLeader(leaderActor);
+
+ sendShutDown(leaderActor);
+
+ testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
+ }
+}
private final byte[] restoreFromSnapshot;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
- protected MockRaftActor(Builder builder) {
+ protected MockRaftActor(AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
return new Builder();
}
- public static class Builder {
+ public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
private Map<String, String> peerAddresses = Collections.emptyMap();
private String id;
private ConfigParams config;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
private byte[] restoreFromSnapshot;
private Optional<Boolean> persistent = Optional.absent();
+ private final Class<A> actorClass;
- public Builder id(String id) {
+ protected AbstractBuilder(Class<A> actorClass) {
+ this.actorClass = actorClass;
+ }
+
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
+
+ public T id(String id) {
this.id = id;
- return this;
+ return self();
}
- public Builder peerAddresses(Map<String, String> peerAddresses) {
+ public T peerAddresses(Map<String, String> peerAddresses) {
this.peerAddresses = peerAddresses;
- return this;
+ return self();
}
- public Builder config(ConfigParams config) {
+ public T config(ConfigParams config) {
this.config = config;
- return this;
+ return self();
}
- public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
+ public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
this.dataPersistenceProvider = dataPersistenceProvider;
- return this;
+ return self();
}
- public Builder roleChangeNotifier(ActorRef roleChangeNotifier) {
+ public T roleChangeNotifier(ActorRef roleChangeNotifier) {
this.roleChangeNotifier = roleChangeNotifier;
- return this;
+ return self();
}
- public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
+ public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
this.snapshotMessageSupport = snapshotMessageSupport;
- return this;
+ return self();
}
- public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) {
+ public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
this.restoreFromSnapshot = restoreFromSnapshot;
- return this;
+ return self();
}
- public Builder persistent(Optional<Boolean> persistent) {
+ public T persistent(Optional<Boolean> persistent) {
this.persistent = persistent;
- return this;
+ return self();
}
public Props props() {
- return Props.create(MockRaftActor.class, this);
+ return Props.create(actorClass, this);
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
+ private Builder() {
+ super(MockRaftActor.class);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.dispatch.Dispatchers;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
+
+/**
+ * Unit tests for RaftActorLeadershipTransferCohort.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
+ private final TestActorFactory factory = new TestActorFactory(getSystem());
+ private MockRaftActor mockRaftActor;
+ private RaftActorLeadershipTransferCohort cohort;
+ private final OnComplete onComplete = mock(OnComplete.class);
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+ @After
+ public void tearDown() {
+ factory.close();
+ }
+
+ private void setup() {
+ String persistenceId = factory.generateActorId("leader-");
+ mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
+ config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor();
+ cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
+ cohort.addOnComplete(onComplete);
+ mockRaftActor.waitForInitializeBehaviorComplete();
+ }
+
+ @Test
+ public void testOnNewLeader() {
+ setup();
+ cohort.setNewLeaderTimeoutInMillis(20000);
+
+ cohort.onNewLeader("new-leader");
+ verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+
+ cohort.transferComplete();
+
+ cohort.onNewLeader(null);
+ verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+
+ cohort.onNewLeader("new-leader");
+ verify(onComplete).onSuccess(mockRaftActor.self(), null);
+ }
+
+ @Test
+ public void testNewLeaderTimeout() {
+ setup();
+ cohort.setNewLeaderTimeoutInMillis(200);
+ cohort.transferComplete();
+ verify(onComplete, timeout(3000)).onSuccess(mockRaftActor.self(), null);
+ }
+
+ @Test
+ public void testNotLeaderOnRun() {
+ config.setElectionTimeoutFactor(10000);
+ setup();
+ cohort.run();
+ verify(onComplete).onSuccess(mockRaftActor.self(), null);
+ }
+
+ @Test
+ public void testAbortTransfer() {
+ setup();
+ cohort.abortTransfer();
+ verify(onComplete).onFailure(mockRaftActor.self(), null);
+ }
+}
MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
}
- @Test
- public void testTransferLeadershipWithNoFollowers() {
- logStart("testTransferLeadershipWithNoFollowers");
-
- MockRaftActorContext leaderActorContext = createActorContext();
-
- leader = new Leader(leaderActorContext);
-
- RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
- leader.transferLeadership(mockTransferCohort);
-
- verify(mockTransferCohort).transferComplete();
- }
-
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {