BUG 2185: Expand the scope of sync status to cover a slow follower
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 6de370e1afc1d39601d571d718093e950790d197..5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69 100644 (file)
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -53,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -60,11 +62,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -181,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
         } else if(message instanceof ShardLeaderStateChanged) {
-            onLeaderStateChanged((ShardLeaderStateChanged)message);
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
         } else {
             unknownMessage(message);
         }
@@ -203,6 +209,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+            shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
             }
@@ -364,8 +371,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
+                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
-                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                         getContext().dispatcher(), getSelf());
 
@@ -388,9 +403,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(String.format(
-                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
-                "recovering and a leader is being elected. Try again later.", shardId));
+        return new NoShardLeaderException(null, shardId.toString());
     }
 
     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
@@ -469,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+        } else {
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
@@ -516,7 +543,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     String primaryPath = info.getSerializedLeaderActor();
                     Object found = canReturnLocalShardState && info.isLeader() ?
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath);
+                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -595,8 +622,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
@@ -666,6 +695,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
         private String leaderId;
+        private short leaderVersion;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> peerAddresses) {
@@ -731,7 +761,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -820,13 +851,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return changed;
         }
 
-        public String getLeaderId() {
+        String getLeaderId() {
             return leaderId;
         }
 
-        public void setLeaderAvailable(boolean leaderAvailable) {
+        void setLeaderAvailable(boolean leaderAvailable) {
             this.leaderAvailable = leaderAvailable;
         }
+
+        short getLeaderVersion() {
+            return leaderVersion;
+        }
+
+        void setLeaderVersion(short leaderVersion) {
+            this.leaderVersion = leaderVersion;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {