BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
index b2c9d30184079141e552c21edfafcf8ba8ba2c0b..05968725ccb221cd7b62b0ea38b9b66ea9e52baa 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
+import static akka.actor.ActorRef.noSender;
 import static akka.pattern.Patterns.ask;
 
 import akka.actor.ActorRef;
@@ -21,6 +22,10 @@ import akka.actor.SupervisorStrategy.Directive;
 import akka.cluster.ClusterEvent;
 import akka.cluster.ClusterEvent.MemberWeaklyUp;
 import akka.cluster.Member;
+import akka.cluster.ddata.DistributedData;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.Replicator.Changed;
+import akka.cluster.ddata.Replicator.Subscribe;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
@@ -33,6 +38,8 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -102,6 +110,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +166,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String persistenceId;
 
+    private final ActorRef replicator;
+
     ShardManager(AbstractShardManagerCreator<?> builder) {
         this.cluster = builder.getCluster();
         this.configuration = builder.getConfiguration();
@@ -180,6 +191,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "shard-manager-" + this.type,
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
         shardManagerMBean.registerMBean();
+
+        replicator = DistributedData.get(context().system()).replicator();
+
+    }
+
+    public void preStart() {
+        LOG.info("Starting Shardmanager {}", persistenceId);
+
+        final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
+                new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
+        replicator.tell(subscribe, noSender());
     }
 
     @Override
@@ -261,6 +283,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onGetLocalShardIds();
         } else if (message instanceof RunnableMessage) {
             ((RunnableMessage)message).run();
+        } else if (message instanceof Changed) {
+            onConfigChanged((Changed) message);
         } else {
             unknownMessage(message);
         }
@@ -316,6 +340,88 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+        LOG.debug("{}, ShardManager {} received config changed {}",
+                cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
+
+        final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+        final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+                changedConfig.values().stream().collect(
+                        Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
+
+        resolveConfig(newConfig);
+    }
+
+    private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+        LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
+                cluster.getCurrentMemberName(), persistenceId, newConfig);
+
+        newConfig.forEach((prefix, config) ->
+                LOG.debug("{} ShardManager : {}, received shard config "
+                        + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
+
+        final SetView<DOMDataTreeIdentifier> removedConfigs =
+                Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
+
+        // resolve removals
+
+        resolveRemovals(removedConfigs);
+
+        final SetView<DOMDataTreeIdentifier> addedConfigs =
+                Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
+        // resolve additions
+
+        resolveAdditions(addedConfigs, newConfig);
+        // iter through to update existing shards, either start/stop replicas or update the shard
+        // to check for more peers
+        resolveUpdates(Collections.emptySet());
+    }
+
+    private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
+        LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
+                cluster.getCurrentMemberName(), persistenceId, removedConfigs);
+
+        removedConfigs.forEach(id -> doRemovePrefixedShard(id));
+    }
+
+    private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
+                                  final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
+        LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
+
+        addedConfigs.stream().filter(identifier
+            -> identifier
+            .getDatastoreType().equals(
+                    ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
+            .forEach(id -> doCreatePrefixedShard(configs.get(id)));
+    }
+
+    private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
+        LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
+    }
+
+    private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
+        LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
+                cluster.getCurrentMemberName(), persistenceId, prefix);
+        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+        final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+        configuration.removePrefixShardConfiguration(prefix);
+
+        if (shard == null) {
+            LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
+            return;
+        }
+
+        if (shard.getActor() != null) {
+            LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+            shard.getActor().tell(Shutdown.INSTANCE, self());
+        }
+        LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
+                persistenceId(), shardId.getShardName());
+        persistShardList();
+    }
+
     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
             String leaderPath) {
         shardReplicaOperationsInProgress.remove(shardId.getShardName());
@@ -468,42 +574,52 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
-        final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+        doCreatePrefixedShard(createPrefixedShard.getConfig());
+        // do not replicate on this level
+    }
+
+    private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
+        LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
 
         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
-                createPrefixedShard.getConfig().getPrefix());
+                config.getPrefix());
         final String shardName = shardId.getShardName();
 
-        configuration.addPrefixShardConfiguration(config);
-
-        DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+        if (localShards.containsKey(shardName)) {
+            LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+            final PrefixShardConfiguration existing =
+                    configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
 
-        if (shardDatastoreContext == null) {
-            final Builder builder = newShardDatastoreContextBuilder(shardName);
-            builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
-                    .storeRoot(config.getPrefix().getRootIdentifier());
-            shardDatastoreContext = builder.build();
-        } else {
-            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
-                    peerAddressResolver).build();
+            if (existing != null && existing.equals(config)) {
+                // we don't have to do nothing here
+                return;
+            }
         }
 
-        final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
-                && currentSnapshot.getShardList().contains(shardName);
+        configuration.addPrefixShardConfiguration(config);
+
+        final Builder builder = newShardDatastoreContextBuilder(shardName);
+        builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                .storeRoot(config.getPrefix().getRootIdentifier());
+        DatastoreContext shardDatastoreContext = builder.build();
 
         final Map<String, String> peerAddresses = Collections.emptyMap();
         final boolean isActiveMember = true;
-        LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
-                persistenceId(), shardId, peerAddresses, isActiveMember);
+        LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
+                        + "{}, peerAddresses: {}, isActiveMember: {}",
+                shardId, persistenceId(), config.getShardMemberNames(),
+                peerAddresses, isActiveMember);
 
         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
-                shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+                shardDatastoreContext, Shard.builder(), peerAddressResolver);
         info.setActiveMember(isActiveMember);
         localShards.put(info.getShardName(), info);
 
         if (schemaContext != null) {
             info.setActor(newShardActor(schemaContext, info));
         }
+
+        persistShardList();
     }
 
     private void doCreateShard(final CreateShard createShard) {
@@ -1094,9 +1210,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
      * @param shardName the shard name
      */
     private Map<String, String> getPeerAddresses(String shardName) {
-        Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
-        Map<String, String> peerAddresses = new HashMap<>();
+        final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+        return getPeerAddresses(shardName, members);
+    }
 
+    private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+        Map<String, String> peerAddresses = new HashMap<>();
         MemberName currentMemberName = this.cluster.getCurrentMemberName();
 
         for (MemberName memberName : members) {
@@ -1371,11 +1490,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(updateShardManagerSnapshot(shardList));
+        saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
     }
 
-    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
-        currentSnapshot = new ShardManagerSnapshot(shardList);
+    private ShardManagerSnapshot updateShardManagerSnapshot(
+            final List<String> shardList,
+            final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+        currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
         return currentSnapshot;
     }