BUG 2185: Expand the scope of sync status to cover a slow follower
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index d33576d495fa66d2306ebb7d7378691269292e21..5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69 100644 (file)
@@ -18,9 +18,7 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
-import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -28,22 +26,16 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.PersistentDataProvider;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -62,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
+import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -69,12 +62,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -115,10 +109,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContext datastoreContext;
 
-    private Collection<String> knownModules = Collections.emptySet();
-
-    private final DataPersistenceProvider dataPersistenceProvider;
-
     private final CountDownLatch waitTillReadyCountdownLatch;
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
@@ -132,7 +122,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
-        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
         this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
@@ -146,10 +135,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
-    }
-
     public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
@@ -200,7 +185,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ShardNotInitializedTimeout) {
             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
         } else if(message instanceof ShardLeaderStateChanged) {
-            onLeaderStateChanged((ShardLeaderStateChanged)message);
+            onLeaderStateChanged((ShardLeaderStateChanged) message);
+        } else if(message instanceof SwitchShardBehavior){
+            onSwitchShardBehavior((SwitchShardBehavior) message);
         } else {
             unknownMessage(message);
         }
@@ -222,6 +209,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+            shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
                 primaryShardInfoCache.remove(shardInformation.getShardName());
             }
@@ -337,26 +325,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     protected void handleRecover(Object message) throws Exception {
-        if(dataPersistenceProvider.isRecoveryApplicable()) {
-            if (message instanceof SchemaContextModules) {
-                SchemaContextModules msg = (SchemaContextModules) message;
-                knownModules = ImmutableSet.copyOf(msg.getModules());
-            } else if (message instanceof RecoveryFailure) {
-                RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error("Recovery failed", failure.cause());
-            } else if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
-
-                // Delete all the messages from the akka journal except the last one
-                deleteMessages(lastSequenceNr() - 1);
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
+        if (message instanceof RecoveryCompleted) {
+            LOG.info("Recovery complete : {}", persistenceId());
 
-                // Delete all the messages from the akka journal
-                deleteMessages(lastSequenceNr());
-            }
+            // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+            // journal on upgrade from Helium.
+            deleteMessages(lastSequenceNr());
         }
     }
 
@@ -397,8 +371,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
+                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
-                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                         getContext().dispatcher(), getSelf());
 
@@ -421,9 +403,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(String.format(
-                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
-                "recovering and a leader is being elected. Try again later.", shardId));
+        return new NoShardLeaderException(null, shardId.toString());
     }
 
     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
@@ -477,6 +457,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(leaderId != null && leaderId.contains(memberName)) {
                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
                 info.setLeaderAvailable(false);
+
+                primaryShardInfoCache.remove(info.getShardName());
             }
         }
     }
@@ -500,6 +482,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSwitchShardBehavior(SwitchShardBehavior message) {
+        ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+        ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+        if(shardInformation != null && shardInformation.getActor() != null) {
+            shardInformation.getActor().tell(
+                    new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+        } else {
+            LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+                    message.getShardName(), message.getNewState());
+        }
+    }
+
     /**
      * Notifies all the local shards of a change in the schema context
      *
@@ -508,40 +504,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void updateSchemaContext(final Object message) {
         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
 
-        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
-        Set<String> newModules = new HashSet<>(128);
-
-        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
-            String s = moduleIdentifier.getNamespace().toString();
-            newModules.add(s);
-        }
+        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
 
-        if(newModules.containsAll(knownModules)) {
-
-            LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
-
-            knownModules = ImmutableSet.copyOf(newModules);
-
-            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
-
-                @Override
-                public void apply(SchemaContextModules param) throws Exception {
-                    LOG.debug("Sending new SchemaContext to Shards");
-                    for (ShardInformation info : localShards.values()) {
-                        if (info.getActor() == null) {
-                            info.setActor(newShardActor(schemaContext, info));
-                        } else {
-                            info.getActor().tell(message, getSelf());
-                        }
-                    }
-                }
-
-            });
-        } else {
-            LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
-                    newModules, knownModules);
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() == null) {
+                LOG.debug("Creating Shard {}", info.getShardId());
+                info.setActor(newShardActor(schemaContext, info));
+            } else {
+                info.getActor().tell(message, getSelf());
+            }
         }
-
     }
 
     @VisibleForTesting
@@ -571,7 +543,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     String primaryPath = info.getSerializedLeaderActor();
                     Object found = canReturnLocalShardState && info.isLeader() ?
                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
-                                new RemotePrimaryShardFound(primaryPath);
+                                new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@ -650,8 +622,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
         }
 
-        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+
+        mBean.setShardManager(this);
     }
 
     /**
@@ -698,16 +672,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return "shard-manager-" + type;
     }
 
-    @VisibleForTesting
-    Collection<String> getKnownModules() {
-        return knownModules;
-    }
-
-    @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
-    }
-
     @VisibleForTesting
     ShardManagerInfoMBean getMBean(){
         return mBean;
@@ -731,6 +695,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
         private String role ;
         private String leaderId;
+        private short leaderVersion;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> peerAddresses) {
@@ -796,7 +761,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -885,13 +851,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return changed;
         }
 
-        public String getLeaderId() {
+        String getLeaderId() {
             return leaderId;
         }
 
-        public void setLeaderAvailable(boolean leaderAvailable) {
+        void setLeaderAvailable(boolean leaderAvailable) {
             this.leaderAvailable = leaderAvailable;
         }
+
+        short getLeaderVersion() {
+            return leaderVersion;
+        }
+
+        void setLeaderVersion(short leaderVersion) {
+            this.leaderVersion = leaderVersion;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -970,6 +944,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    /**
+     * We no longer persist SchemaContextModules but keep this class around for now for backwards
+     * compatibility so we don't get de-serialization failures on upgrade from Helium.
+     */
+    @Deprecated
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;