BUG 2702 : Switch to a low impact scheduling mechanism for IsolatedLeaderCheck 48/15548/3
authorMoiz Raja <moraja@cisco.com>
Fri, 20 Feb 2015 12:37:46 +0000 (04:37 -0800)
committerMoiz Raja <moraja@cisco.com>
Fri, 20 Feb 2015 19:45:13 +0000 (11:45 -0800)
Using the akka scheduler's schedule mechanism to schedule tasks can cause
disruption in processing messages as scheduled messages can get processed before
other messages. To avoid this we now generate an isolated leader check from
the heartbeat processor and avoid an extra scheduled task.

Change-Id: Ied6b518796da92742e2c61eebcd1e0a3415df66a
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java

index 78a1335d58a2ed7cacf78572425be7beb20274ca..fd49737cac45285e1fc1c89d82a157dad5179953 100644 (file)
@@ -75,7 +75,7 @@ public interface ConfigParams {
      * The interval in which the leader needs to check itself if its isolated
      * @return FiniteDuration
      */
-    FiniteDuration getIsolatedCheckInterval();
+    long getIsolatedCheckIntervalInMillis();
 
 
     /**
index 86867e1d040ee84396450ee72f6097093aecd70e..3e6742c17d37c178d30c7d570490ff993c068806 100644 (file)
@@ -42,8 +42,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
-    private FiniteDuration isolatedLeaderCheckInterval =
-        new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+    private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
 
     // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
     // in-memory journal can use before it needs to snapshot
@@ -68,7 +67,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     }
 
     public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
-        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
     }
 
     public void setElectionTimeoutFactor(long electionTimeoutFactor){
@@ -112,7 +111,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     }
 
     @Override
-    public FiniteDuration getIsolatedCheckInterval() {
+    public long getIsolatedCheckIntervalInMillis() {
         return isolatedLeaderCheckInterval;
     }
 
index 66b8fba674161ddbc8f9f2e5a64154b62226c683..1552096ef15aed6bf8f941a1c24474c9ca8cbe44 100644 (file)
@@ -281,6 +281,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
+    protected void beforeSendHeartbeat(){}
+
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
@@ -304,6 +306,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         try {
             if (message instanceof SendHeartBeat) {
+                beforeSendHeartbeat();
                 sendHeartBeat();
                 return this;
 
index 7a94c0c15849038f35105789856f98cb74580d51..ebcdcd40fb078ebcc16439ec2feaa87b6f62eca4 100644 (file)
@@ -8,12 +8,12 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
@@ -38,15 +38,12 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public class Leader extends AbstractLeader {
-    private Cancellable installSnapshotSchedule = null;
-    private Cancellable isolatedLeaderCheckSchedule = null;
+    private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+    private final Stopwatch isolatedLeaderCheck;
 
     public Leader(RaftActorContext context) {
         super(context);
-
-        scheduleIsolatedLeaderCheck(
-            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
-                context.getConfigParams().getHeartBeatInterval().unit()));
+        isolatedLeaderCheck = Stopwatch.createStarted();
     }
 
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
@@ -54,8 +51,9 @@ public class Leader extends AbstractLeader {
 
         if (originalMessage instanceof IsolatedLeaderCheck) {
             if (isLeaderIsolated()) {
-                LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
                         context.getId(), minIsolatedLeaderPeerCount, leaderId);
+
                 return switchBehavior(new IsolatedLeader(context));
             }
         }
@@ -63,21 +61,17 @@ public class Leader extends AbstractLeader {
         return super.handleMessage(sender, originalMessage);
     }
 
-    protected void stopIsolatedLeaderCheckSchedule() {
-        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
-            isolatedLeaderCheckSchedule.cancel();
+    @Override
+    protected void beforeSendHeartbeat(){
+        if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+            context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+            isolatedLeaderCheck.reset().start();
         }
-    }
 
-    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
-        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
-            context.getActor(), new IsolatedLeaderCheck(),
-            context.getActorSystem().dispatcher(), context.getActor());
     }
 
     @Override
     public void close() throws Exception {
-        stopIsolatedLeaderCheckSchedule();
         super.close();
     }
 
index 853ed5867d4395d460be0692966b1abee92b2fdf..34e1ed9b3b3478032c86e9058998ac23610c7c97 100644 (file)
@@ -1024,7 +1024,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leader = new Leader(leaderActorContext);
-            leader.stopIsolatedLeaderCheckSchedule();
 
             leader.markFollowerActive("follower-1");
             leader.markFollowerActive("follower-2");
index 3e8982371808d3eb2228494ababcc64dd04c844e..d3a3a8fc2df812b88ffc500c71e17d224189b541 100644 (file)
@@ -28,7 +28,7 @@ public class DatastoreContextTest {
         assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
         assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
         assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
-        assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+        assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckIntervalInMillis());
         assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
         assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
         assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());