BUG 2185: Expand the scope of sync status to cover a slow follower
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 4f3c521f442ea10765d06dd4f0372d72968e712a..5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69 100644 (file)
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -53,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -60,11 +62,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -181,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
         } else if(message instanceof ShardLeaderStateChanged) {
-            onLeaderStateChanged((ShardLeaderStateChanged)message);
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
         } else {
             unknownMessage(message);
         }
@@ -365,8 +371,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
+                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
-                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                         getContext().dispatcher(), getSelf());
 
@@ -389,9 +403,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(String.format(
-                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
-                "recovering and a leader is being elected. Try again later.", shardId));
+        return new NoShardLeaderException(null, shardId.toString());
     }
 
     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
@@ -470,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+        } else {
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
@@ -596,8 +622,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
@@ -733,7 +761,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {