Implement pauseLeader timeout for leadership transfer 42/31942/6
authorTom Pantelis <tpanteli@brocade.com>
Tue, 29 Dec 2015 11:52:45 +0000 (06:52 -0500)
committerAnil Vishnoi <vishnoianil@gmail.com>
Thu, 7 Jan 2016 02:39:38 +0000 (02:39 +0000)
Added an abstract TimedRunnable class that implements a Runnable operation
with a timer such that if the run method isn't invoked within a timeout
period, the operation is cancelled. The
RaftActorLeadershipTransferCohort passes a TimedRunnable instance to
pauseLeader to abort the transfer if pauseLeader doesn't complete within
an election timeout period.

Change-Id: I773605117dc4e310f3ee5025c0131b9f1447c746
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohortTest.java

index 610a7d8f2f886d6ad94c8c80b0a8ca633e02214a..2bd75923d9a06c24a34f506ab2b4574263ab6f0a 100644 (file)
@@ -316,9 +316,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 }
             });
         } else if(currentBehavior.state() == RaftState.Leader) {
                 }
             });
         } else if(currentBehavior.state() == RaftState.Leader) {
-            pauseLeader(new Runnable() {
+            pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
                 @Override
-                public void run() {
+                protected void doRun() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
+
+                @Override
+                protected void doCancel() {
                     self().tell(PoisonPill.getInstance(), self());
                 }
             });
                     self().tell(PoisonPill.getInstance(), self());
                 }
             });
@@ -723,8 +728,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
     /**
      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
-     * work prior to performing the operation. On completion of any work, the run method must be called to
-     * proceed with the given operation.
+     * work prior to performing the operation. On completion of any work, the run method must be called on the
+     * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+     * this actor's thread dispatcher as as it modifies internal state.
      * <p>
      * The default implementation immediately runs the operation.
      *
      * <p>
      * The default implementation immediately runs the operation.
      *
index 7105714b0ba8e807fd7cc137b928a3ad0a875539..b83bfd370948dc18a3826633568e2a78eeb650ac 100644 (file)
@@ -48,7 +48,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * @author Thomas Pantelis
  */
  *
  * @author Thomas Pantelis
  */
-public class RaftActorLeadershipTransferCohort implements Runnable {
+public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
     private final RaftActor raftActor;
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
     private final RaftActor raftActor;
@@ -84,14 +84,25 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
             }
         }
 
             }
         }
 
-        raftActor.pauseLeader(this);
+        raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+            @Override
+            protected void doRun() {
+                doTransfer();
+            }
+
+            @Override
+            protected void doCancel() {
+                LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
+                abortTransfer();
+            }
+        });
     }
 
     /**
     }
 
     /**
-     * This method is invoked to run the leadership transfer.
+     * This method is invoked to perform the leadership transfer.
      */
      */
-    @Override
-    public void run() {
+    @VisibleForTesting
+    void doTransfer() {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
         // Sanity check...
         if(behavior instanceof Leader) {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
         // Sanity check...
         if(behavior instanceof Leader) {
@@ -125,7 +136,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
 
         // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
         // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
 
         // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
         // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
-        // safely run on actor's thread dispatcher.
+        // safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
                 new Runnable() {
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
                 new Runnable() {
@@ -153,7 +164,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
                         raftActor.getLeaderId(), transferTimer.toString());
             } else {
                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
                         raftActor.getLeaderId(), transferTimer.toString());
             } else {
-                LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+                LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
                         transferTimer.toString());
             }
         }
                         transferTimer.toString());
             }
         }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/TimedRunnable.java
new file mode 100644 (file)
index 0000000..933b134
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.Cancellable;
+import com.google.common.base.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * An abstract class that implements a Runnable operation with a timer such that if the run method isn't
+ * invoked within a timeout period, the operation is cancelled via {@link #doCancel}.
+ * <p>
+ * <b>Note:</b> this class is not thread safe and is intended for use only within the context of the same
+ * actor that's passed on construction. The run method must be called on this actor's thread dispatcher as it
+ * modifies internal state.
+ *
+ * @author Thomas Pantelis
+ */
+abstract class TimedRunnable implements Runnable {
+    private final Cancellable cancelTimer;
+    private boolean canRun = true;
+
+    TimedRunnable(FiniteDuration timeout, RaftActor actor) {
+        Preconditions.checkNotNull(timeout);
+        Preconditions.checkNotNull(actor);
+        cancelTimer = actor.getContext().system().scheduler().scheduleOnce(timeout, actor.self(), new Runnable() {
+            @Override
+            public void run() {
+                cancel();
+            }
+        }, actor.getContext().system().dispatcher(), actor.self());
+    }
+
+    @Override
+    public void run() {
+        if(canRun) {
+            canRun = false;
+            cancelTimer.cancel();
+            doRun();
+        }
+    }
+
+    private void cancel() {
+        canRun = false;
+        doCancel();
+    }
+
+    /**
+     * Overridden to perform the operation if not previously cancelled or run.
+     */
+    protected abstract void doRun();
+
+    /**
+     * Overridden to cancel the operation on time out.
+     */
+    protected abstract void doCancel();
+}
index 550504b4006161713ae8d5e166ac9bf169c2417e..c1a87e8c7b283e1f83b9114471dba911426127cb 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.ByteArrayInputStream;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.ByteArrayInputStream;
@@ -29,7 +30,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
-
     public static final short PAYLOAD_VERSION = 5;
 
     final RaftActor actorDelegate;
     public static final short PAYLOAD_VERSION = 5;
 
     final RaftActor actorDelegate;
@@ -43,6 +43,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
     private final byte[] restoreFromSnapshot;
     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
     private final byte[] restoreFromSnapshot;
     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
+    private final Function<Runnable, Void> pauseLeaderFunction;
 
     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
 
     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
@@ -60,6 +61,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         roleChangeNotifier = builder.roleChangeNotifier;
         snapshotMessageSupport = builder.snapshotMessageSupport;
         restoreFromSnapshot = builder.restoreFromSnapshot;
         roleChangeNotifier = builder.roleChangeNotifier;
         snapshotMessageSupport = builder.snapshotMessageSupport;
         restoreFromSnapshot = builder.restoreFromSnapshot;
+        pauseLeaderFunction = builder.pauseLeaderFunction;
     }
 
     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
     }
 
     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
@@ -216,6 +218,15 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
         }
     }
 
+    @Override
+    protected void pauseLeader(Runnable operation) {
+        if(pauseLeaderFunction != null) {
+            pauseLeaderFunction.apply(operation);
+        } else {
+            super.pauseLeader(operation);
+        }
+    }
+
     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
@@ -269,6 +280,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         private byte[] restoreFromSnapshot;
         private Optional<Boolean> persistent = Optional.absent();
         private final Class<A> actorClass;
         private byte[] restoreFromSnapshot;
         private Optional<Boolean> persistent = Optional.absent();
         private final Class<A> actorClass;
+        private Function<Runnable, Void> pauseLeaderFunction;
 
         protected AbstractBuilder(Class<A> actorClass) {
             this.actorClass = actorClass;
 
         protected AbstractBuilder(Class<A> actorClass) {
             this.actorClass = actorClass;
@@ -319,6 +331,11 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
             return self();
         }
 
             return self();
         }
 
+        public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
+            this.pauseLeaderFunction = pauseLeaderFunction;
+            return self();
+        }
+
         public Props props() {
             return Props.create(actorClass, this);
         }
         public Props props() {
             return Props.create(actorClass, this);
         }
index 5680a0b5901b3be485ce42ecc32168f18c608775..c7d9e3dfb8737b795eaf1b29d23499291806d225 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.dispatch.Dispatchers;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.dispatch.Dispatchers;
+import com.google.common.base.Function;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
@@ -26,7 +27,8 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
     private MockRaftActor mockRaftActor;
     private RaftActorLeadershipTransferCohort cohort;
     private final OnComplete onComplete = mock(OnComplete.class);
     private MockRaftActor mockRaftActor;
     private RaftActorLeadershipTransferCohort cohort;
     private final OnComplete onComplete = mock(OnComplete.class);
-    DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+    private final DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+    private Function<Runnable, Void> pauseLeaderFunction;
 
     @After
     public void tearDown() {
 
     @After
     public void tearDown() {
@@ -36,7 +38,8 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
     private void setup() {
         String persistenceId = factory.generateActorId("leader-");
         mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
     private void setup() {
         String persistenceId = factory.generateActorId("leader-");
         mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
-                config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId).underlyingActor();
+                config).pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                persistenceId).underlyingActor();
         cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
         cohort.addOnComplete(onComplete);
         mockRaftActor.waitForInitializeBehaviorComplete();
         cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
         cohort.addOnComplete(onComplete);
         mockRaftActor.waitForInitializeBehaviorComplete();
@@ -71,7 +74,7 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
     public void testNotLeaderOnRun() {
         config.setElectionTimeoutFactor(10000);
         setup();
     public void testNotLeaderOnRun() {
         config.setElectionTimeoutFactor(10000);
         setup();
-        cohort.run();
+        cohort.doTransfer();
         verify(onComplete).onSuccess(mockRaftActor.self(), null);
     }
 
         verify(onComplete).onSuccess(mockRaftActor.self(), null);
     }
 
@@ -81,4 +84,18 @@ public class RaftActorLeadershipTransferCohortTest extends AbstractActorTest {
         cohort.abortTransfer();
         verify(onComplete).onFailure(mockRaftActor.self(), null);
     }
         cohort.abortTransfer();
         verify(onComplete).onFailure(mockRaftActor.self(), null);
     }
+
+    @Test
+    public void testPauseLeaderTimeout() {
+        pauseLeaderFunction = new Function<Runnable, Void>() {
+            @Override
+            public Void apply(Runnable input) {
+                return null;
+            }
+        };
+
+        setup();
+        cohort.init();
+        verify(onComplete, timeout(2000)).onFailure(mockRaftActor.self(), null);
+    }
 }
 }