From 135129e0cca66040fd512fab740d59b2ab1f8382 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 29 Dec 2015 06:52:45 -0500 Subject: [PATCH] Implement pauseLeader timeout for leadership transfer Added an abstract TimedRunnable class that implements a Runnable operation with a timer such that if the run method isn't invoked within a timeout period, the operation is cancelled. The RaftActorLeadershipTransferCohort passes a TimedRunnable instance to pauseLeader to abort the transfer if pauseLeader doesn't complete within an election timeout period. Change-Id: I773605117dc4e310f3ee5025c0131b9f1447c746 Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 14 +++-- .../RaftActorLeadershipTransferCohort.java | 25 +++++--- .../cluster/raft/TimedRunnable.java | 62 +++++++++++++++++++ .../cluster/raft/MockRaftActor.java | 19 +++++- ...RaftActorLeadershipTransferCohortTest.java | 23 ++++++- 5 files changed, 128 insertions(+), 15 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 610a7d8f2f..2bd75923d9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -316,9 +316,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } }); } else if(currentBehavior.state() == RaftState.Leader) { - pauseLeader(new Runnable() { + pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override - public void run() { + protected void doRun() { + self().tell(PoisonPill.getInstance(), self()); + } + + @Override + protected void doCancel() { self().tell(PoisonPill.getInstance(), self()); } }); @@ -723,8 +728,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * This method is called prior to operations such as leadership transfer and actor shutdown when the leader * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current - * work prior to performing the operation. On completion of any work, the run method must be called to - * proceed with the given operation. + * work prior to performing the operation. On completion of any work, the run method must be called on the + * given Runnable to proceed with the given operation. Important: the run method must be called on + * this actor's thread dispatcher as as it modifies internal state. *

* The default implementation immediately runs the operation. * diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 7105714b0b..b83bfd3709 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -48,7 +48,7 @@ import scala.concurrent.duration.FiniteDuration; * * @author Thomas Pantelis */ -public class RaftActorLeadershipTransferCohort implements Runnable { +public class RaftActorLeadershipTransferCohort { private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); private final RaftActor raftActor; @@ -84,14 +84,25 @@ public class RaftActorLeadershipTransferCohort implements Runnable { } } - raftActor.pauseLeader(this); + raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) { + @Override + protected void doRun() { + doTransfer(); + } + + @Override + protected void doCancel() { + LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId()); + abortTransfer(); + } + }); } /** - * This method is invoked to run the leadership transfer. + * This method is invoked to perform the leadership transfer. */ - @Override - public void run() { + @VisibleForTesting + void doTransfer() { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); // Sanity check... if(behavior instanceof Leader) { @@ -125,7 +136,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it - // safely run on actor's thread dispatcher. + // safely run on the actor's thread dispatcher. FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), new Runnable() { @@ -153,7 +164,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable { LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(), raftActor.getLeaderId(), transferTimer.toString()); } else { - LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), + LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer.toString()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java new file mode 100644 index 0000000000..933b134341 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.Cancellable; +import com.google.common.base.Preconditions; +import scala.concurrent.duration.FiniteDuration; + +/** + * An abstract class that implements a Runnable operation with a timer such that if the run method isn't + * invoked within a timeout period, the operation is cancelled via {@link #doCancel}. + *

+ * Note: this class is not thread safe and is intended for use only within the context of the same + * actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it + * modifies internal state. + * + * @author Thomas Pantelis + */ +abstract class TimedRunnable implements Runnable { + private final Cancellable cancelTimer; + private boolean canRun = true; + + TimedRunnable(FiniteDuration timeout, RaftActor actor) { + Preconditions.checkNotNull(timeout); + Preconditions.checkNotNull(actor); + cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() { + @Override + public void run() { + cancel(); + } + }, actor.getContext().system().dispatcher(), actor.self()); + } + + @Override + public void run() { + if(canRun) { + canRun = false; + cancelTimer.cancel(); + doRun(); + } + } + + private void cancel() { + canRun = false; + doCancel(); + } + + /** + * Overridden to perform the operation if not previously cancelled or run. + */ + protected abstract void doRun(); + + /** + * Overridden to cancel the operation on time out. + */ + protected abstract void doCancel(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 550504b400..c1a87e8c7b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.Props; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Uninterruptibles; import java.io.ByteArrayInputStream; @@ -29,7 +30,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - public static final short PAYLOAD_VERSION = 5; final RaftActor actorDelegate; @@ -43,6 +43,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private RaftActorSnapshotMessageSupport snapshotMessageSupport; private final byte[] restoreFromSnapshot; final CountDownLatch snapshotCommitted = new CountDownLatch(1); + private final Function pauseLeaderFunction; protected MockRaftActor(AbstractBuilder builder) { super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION); @@ -60,6 +61,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, roleChangeNotifier = builder.roleChangeNotifier; snapshotMessageSupport = builder.snapshotMessageSupport; restoreFromSnapshot = builder.restoreFromSnapshot; + pauseLeaderFunction = builder.pauseLeaderFunction; } public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) { @@ -216,6 +218,15 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, } } + @Override + protected void pauseLeader(Runnable operation) { + if(pauseLeaderFunction != null) { + pauseLeaderFunction.apply(operation); + } else { + super.pauseLeader(operation); + } + } + public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; @@ -269,6 +280,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private byte[] restoreFromSnapshot; private Optional persistent = Optional.absent(); private final Class actorClass; + private Function pauseLeaderFunction; protected AbstractBuilder(Class actorClass) { this.actorClass = actorClass; @@ -319,6 +331,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, return self(); } + public T pauseLeaderFunction(Function pauseLeaderFunction) { + this.pauseLeaderFunction = pauseLeaderFunction; + return self(); + } + public Props props() { return Props.create(actorClass, this); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java index 5680a0b590..c7d9e3dfb8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java @@ -12,6 +12,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import akka.dispatch.Dispatchers; +import com.google.common.base.Function; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete; @@ -26,7 +27,8 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest { private MockRaftActor mockRaftActor; private RaftActorLeadershipTransferCohort cohort; private final OnComplete onComplete = mock(OnComplete.class); - DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + private final DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + private Function pauseLeaderFunction; @After public void tearDown() { @@ -36,7 +38,8 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest { private void setup() { String persistenceId = factory.generateActorId("leader-"); mockRaftActor = factory.createTestActor(MockRaftActor.builder().id(persistenceId).config( - config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor(); + config).pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()), + persistenceId).underlyingActor(); cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null); cohort.addOnComplete(onComplete); mockRaftActor.waitForInitializeBehaviorComplete(); @@ -71,7 +74,7 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest { public void testNotLeaderOnRun() { config.setElectionTimeoutFactor(10000); setup(); - cohort.run(); + cohort.doTransfer(); verify(onComplete).onSuccess(mockRaftActor.self(), null); } @@ -81,4 +84,18 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest { cohort.abortTransfer(); verify(onComplete).onFailure(mockRaftActor.self(), null); } + + @Test + public void testPauseLeaderTimeout() { + pauseLeaderFunction = new Function() { + @Override + public Void apply(Runnable input) { + return null; + } + }; + + setup(); + cohort.init(); + verify(onComplete, timeout(2000)).onFailure(mockRaftActor.self(), null); + } } -- 2.36.6