Bug 2970: Remove SchemaContextModules perisistence
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index f4fa7b3a97e8617f8ac474b494a08deb330a0b6d..6de370e1afc1d39601d571d718093e950790d197 100644 (file)
@@ -18,9 +18,7 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
-import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -28,22 +26,15 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.NonPersistentDataProvider;
-import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -64,12 +55,12 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,26 +105,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreContext datastoreContext;
 
-    private Collection<String> knownModules = Collections.emptySet();
-
-    private final DataPersistenceProvider dataPersistenceProvider;
-
     private final CountDownLatch waitTillReadyCountdownLatch;
 
+    private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+            PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
-        this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
         this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = primaryShardInfoCache;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -141,21 +131,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
-        return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
-    }
-
     public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext,
-        final CountDownLatch waitTillReadyCountdownLatch) {
+        final CountDownLatch waitTillReadyCountdownLatch,
+        final PrimaryShardInfoFutureCache primaryShardInfoCache) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+                waitTillReadyCountdownLatch, primaryShardInfoCache));
     }
 
     @Override
@@ -180,7 +169,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
-            ignoreMessage(message);
+            memberUnreachable((ClusterEvent.UnreachableMember)message);
+        } else if(message instanceof ClusterEvent.ReachableMember) {
+            memberReachable((ClusterEvent.ReachableMember) message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
         } else if(message instanceof RoleChangeNotification) {
@@ -212,7 +203,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
         if(shardInformation != null) {
             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
-            shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+            if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+                primaryShardInfoCache.remove(shardInformation.getShardName());
+            }
+
             checkReady();
         } else {
             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
@@ -324,26 +318,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     protected void handleRecover(Object message) throws Exception {
-        if(dataPersistenceProvider.isRecoveryApplicable()) {
-            if (message instanceof SchemaContextModules) {
-                SchemaContextModules msg = (SchemaContextModules) message;
-                knownModules = ImmutableSet.copyOf(msg.getModules());
-            } else if (message instanceof RecoveryFailure) {
-                RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error("Recovery failed", failure.cause());
-            } else if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
-
-                // Delete all the messages from the akka journal except the last one
-                deleteMessages(lastSequenceNr() - 1);
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
-                LOG.info("Recovery complete : {}", persistenceId());
+        if (message instanceof RecoveryCompleted) {
+            LOG.info("Recovery complete : {}", persistenceId());
 
-                // Delete all the messages from the akka journal
-                deleteMessages(lastSequenceNr());
-            }
+            // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+            // journal on upgrade from Helium.
+            deleteMessages(lastSequenceNr());
         }
     }
 
@@ -444,6 +424,42 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         checkReady();
     }
 
+    private void memberReachable(ClusterEvent.ReachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberAvailable(memberName);
+    }
+
+    private void memberUnreachable(ClusterEvent.UnreachableMember message) {
+        String memberName = message.member().roles().head();
+        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+
+        markMemberUnavailable(memberName);
+    }
+
+    private void markMemberUnavailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as unavailable.", leaderId);
+                info.setLeaderAvailable(false);
+
+                primaryShardInfoCache.remove(info.getShardName());
+            }
+        }
+    }
+
+    private void markMemberAvailable(final String memberName) {
+        for(ShardInformation info : localShards.values()){
+            String leaderId = info.getLeaderId();
+            if(leaderId != null && leaderId.contains(memberName)) {
+                LOG.debug("Marking Leader {} as available.", leaderId);
+                info.setLeaderAvailable(true);
+            }
+        }
+    }
+
     private void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
         for (ShardInformation info : localShards.values()) {
@@ -461,40 +477,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void updateSchemaContext(final Object message) {
         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
 
-        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
-        Set<String> newModules = new HashSet<>(128);
-
-        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
-            String s = moduleIdentifier.getNamespace().toString();
-            newModules.add(s);
-        }
-
-        if(newModules.containsAll(knownModules)) {
-
-            LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
-
-            knownModules = ImmutableSet.copyOf(newModules);
+        LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
 
-            dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
-
-                @Override
-                public void apply(SchemaContextModules param) throws Exception {
-                    LOG.debug("Sending new SchemaContext to Shards");
-                    for (ShardInformation info : localShards.values()) {
-                        if (info.getActor() == null) {
-                            info.setActor(newShardActor(schemaContext, info));
-                        } else {
-                            info.getActor().tell(message, getSelf());
-                        }
-                    }
-                }
-
-            });
-        } else {
-            LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
-                    newModules, knownModules);
+        for (ShardInformation info : localShards.values()) {
+            if (info.getActor() == null) {
+                LOG.debug("Creating Shard {}", info.getShardId());
+                info.setActor(newShardActor(schemaContext, info));
+            } else {
+                info.getActor().tell(message, getSelf());
+            }
         }
-
     }
 
     @VisibleForTesting
@@ -651,16 +643,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return "shard-manager-" + type;
     }
 
-    @VisibleForTesting
-    Collection<String> getKnownModules() {
-        return knownModules;
-    }
-
-    @VisibleForTesting
-    DataPersistenceProvider getDataPersistenceProvider() {
-        return dataPersistenceProvider;
-    }
-
     @VisibleForTesting
     ShardManagerInfoMBean getMBean(){
         return mBean;
@@ -674,6 +656,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorPath actorPath;
         private final Map<String, String> peerAddresses;
         private Optional<DataTree> localShardDataTree;
+        private boolean leaderAvailable = false;
 
         // flag that determines if the actor is ready for business
         private boolean actorInitialized = false;
@@ -748,7 +731,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -826,10 +809,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return false;
         }
 
-        void setLeaderId(String leaderId) {
+        boolean setLeaderId(String leaderId) {
+            boolean changed = !Objects.equal(this.leaderId, leaderId);
             this.leaderId = leaderId;
-
+            if(leaderId != null) {
+                this.leaderAvailable = true;
+            }
             notifyOnShardInitializedCallbacks();
+
+            return changed;
+        }
+
+        public String getLeaderId() {
+            return leaderId;
+        }
+
+        public void setLeaderAvailable(boolean leaderAvailable) {
+            this.leaderAvailable = leaderAvailable;
         }
     }
 
@@ -840,18 +836,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final Configuration configuration;
         final DatastoreContext datastoreContext;
         private final CountDownLatch waitTillReadyCountdownLatch;
+        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-        ShardManagerCreator(ClusterWrapper cluster,
-                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
+        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+                CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            this.primaryShardInfoCache = primaryShardInfoCache;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+                    primaryShardInfoCache);
         }
     }
 
@@ -906,6 +905,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    /**
+     * We no longer persist SchemaContextModules but keep this class around for now for backwards
+     * compatibility so we don't get de-serialization failures on upgrade from Helium.
+     */
+    @Deprecated
     static class SchemaContextModules implements Serializable {
         private static final long serialVersionUID = -8884620101025936590L;