}
});
} else if(currentBehavior.state() == RaftState.Leader) {
- pauseLeader(new Runnable() {
+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
- public void run() {
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
+ @Override
+ protected void doCancel() {
self().tell(PoisonPill.getInstance(), self());
}
});
/**
* This method is called prior to operations such as leadership transfer and actor shutdown when the leader
* must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
- * work prior to performing the operation. On completion of any work, the run method must be called to
- * proceed with the given operation.
+ * work prior to performing the operation. On completion of any work, the run method must be called on the
+ * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+ * this actor's thread dispatcher as as it modifies internal state.
* <p>
* The default implementation immediately runs the operation.
*
*
* @author Thomas Pantelis
*/
-public class RaftActorLeadershipTransferCohort implements Runnable {
+public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
private final RaftActor raftActor;
}
}
- raftActor.pauseLeader(this);
+ raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+ @Override
+ protected void doRun() {
+ doTransfer();
+ }
+
+ @Override
+ protected void doCancel() {
+ LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
+ abortTransfer();
+ }
+ });
}
/**
- * This method is invoked to run the leadership transfer.
+ * This method is invoked to perform the leadership transfer.
*/
- @Override
- public void run() {
+ @VisibleForTesting
+ void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
if(behavior instanceof Leader) {
// Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
// leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
- // safely run on actor's thread dispatcher.
+ // safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
new Runnable() {
LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
raftActor.getLeaderId(), transferTimer.toString());
} else {
- LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
transferTimer.toString());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.Cancellable;
+import com.google.common.base.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * An abstract class that implements a Runnable operation with a timer such that if the run method isn't
+ * invoked within a timeout period, the operation is cancelled via {@link #doCancel}.
+ * <p>
+ * <b>Note:</b> this class is not thread safe and is intended for use only within the context of the same
+ * actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it
+ * modifies internal state.
+ *
+ * @author Thomas Pantelis
+ */
+abstract class TimedRunnable implements Runnable {
+ private final Cancellable cancelTimer;
+ private boolean canRun = true;
+
+ TimedRunnable(FiniteDuration timeout, RaftActor actor) {
+ Preconditions.checkNotNull(timeout);
+ Preconditions.checkNotNull(actor);
+ cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() {
+ @Override
+ public void run() {
+ cancel();
+ }
+ }, actor.getContext().system().dispatcher(), actor.self());
+ }
+
+ @Override
+ public void run() {
+ if(canRun) {
+ canRun = false;
+ cancelTimer.cancel();
+ doRun();
+ }
+ }
+
+ private void cancel() {
+ canRun = false;
+ doCancel();
+ }
+
+ /**
+ * Overridden to perform the operation if not previously cancelled or run.
+ */
+ protected abstract void doRun();
+
+ /**
+ * Overridden to cancel the operation on time out.
+ */
+ protected abstract void doCancel();
+}
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
import akka.actor.Props;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayInputStream;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
public static final short PAYLOAD_VERSION = 5;
final RaftActor actorDelegate;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
private final byte[] restoreFromSnapshot;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
+ private final Function<Runnable, Void> pauseLeaderFunction;
protected MockRaftActor(AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
roleChangeNotifier = builder.roleChangeNotifier;
snapshotMessageSupport = builder.snapshotMessageSupport;
restoreFromSnapshot = builder.restoreFromSnapshot;
+ pauseLeaderFunction = builder.pauseLeaderFunction;
}
public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
}
}
+ @Override
+ protected void pauseLeader(Runnable operation) {
+ if(pauseLeaderFunction != null) {
+ pauseLeaderFunction.apply(operation);
+ } else {
+ super.pauseLeader(operation);
+ }
+ }
+
public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
private byte[] restoreFromSnapshot;
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
+ private Function<Runnable, Void> pauseLeaderFunction;
protected AbstractBuilder(Class<A> actorClass) {
this.actorClass = actorClass;
return self();
}
+ public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
+ this.pauseLeaderFunction = pauseLeaderFunction;
+ return self();
+ }
+
public Props props() {
return Props.create(actorClass, this);
}
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import akka.dispatch.Dispatchers;
+import com.google.common.base.Function;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
private MockRaftActor mockRaftActor;
private RaftActorLeadershipTransferCohort cohort;
private final OnComplete onComplete = mock(OnComplete.class);
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ private final DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ private Function<Runnable, Void> pauseLeaderFunction;
@After
public void tearDown() {
private void setup() {
String persistenceId = factory.generateActorId("leader-");
mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
- config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor();
+ config).pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ persistenceId).underlyingActor();
cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
cohort.addOnComplete(onComplete);
mockRaftActor.waitForInitializeBehaviorComplete();
public void testNotLeaderOnRun() {
config.setElectionTimeoutFactor(10000);
setup();
- cohort.run();
+ cohort.doTransfer();
verify(onComplete).onSuccess(mockRaftActor.self(), null);
}
cohort.abortTransfer();
verify(onComplete).onFailure(mockRaftActor.self(), null);
}
+
+ @Test
+ public void testPauseLeaderTimeout() {
+ pauseLeaderFunction = new Function<Runnable, Void>() {
+ @Override
+ public Void apply(Runnable input) {
+ return null;
+ }
+ };
+
+ setup();
+ cohort.init();
+ verify(onComplete, timeout(2000)).onFailure(mockRaftActor.self(), null);
+ }
}