X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=05968725ccb221cd7b62b0ea38b9b66ea9e52baa;hb=583f30d1c7a8199b401c9393745c62fe27b5ced8;hp=736742cb1c73e49d15760fbbb99098d4bd99619b;hpb=2f77e92af7a68b4a97dbfb709c6cc9b11a49878a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 736742cb1c..05968725cc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; +import static akka.actor.ActorRef.noSender; import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; @@ -21,6 +22,10 @@ import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; +import akka.cluster.ddata.DistributedData; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.Replicator.Changed; +import akka.cluster.ddata.Replicator.Subscribe; import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; @@ -33,9 +38,8 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -50,7 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.commons.lang3.SerializationUtils; +import java.util.stream.Collectors; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -84,6 +88,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -105,6 +110,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,6 +166,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String persistenceId; + private final ActorRef replicator; + ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); this.configuration = builder.getConfiguration(); @@ -183,6 +191,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { "shard-manager-" + this.type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); + + replicator = DistributedData.get(context().system()).replicator(); + + } + + public void preStart() { + LOG.info("Starting Shardmanager {}", persistenceId); + + final Subscribe> subscribe = + new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); + replicator.tell(subscribe, noSender()); } @Override @@ -264,6 +283,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetLocalShardIds(); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); + } else if (message instanceof Changed) { + onConfigChanged((Changed) message); } else { unknownMessage(message); } @@ -319,6 +340,88 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onConfigChanged(final Changed> change) { + LOG.debug("{}, ShardManager {} received config changed {}", + cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries()); + + final Map changedConfig = change.dataValue().getEntries(); + + final Map newConfig = + changedConfig.values().stream().collect( + Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity())); + + resolveConfig(newConfig); + } + + private void resolveConfig(final Map newConfig) { + LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}", + cluster.getCurrentMemberName(), persistenceId, newConfig); + + newConfig.forEach((prefix, config) -> + LOG.debug("{} ShardManager : {}, received shard config " + + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config)); + + final SetView removedConfigs = + Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet()); + + // resolve removals + + resolveRemovals(removedConfigs); + + final SetView addedConfigs = + Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet()); + // resolve additions + + resolveAdditions(addedConfigs, newConfig); + // iter through to update existing shards, either start/stop replicas or update the shard + // to check for more peers + resolveUpdates(Collections.emptySet()); + } + + private void resolveRemovals(final Set removedConfigs) { + LOG.debug("{} ShardManager : {}, resolving removed configs : {}", + cluster.getCurrentMemberName(), persistenceId, removedConfigs); + + removedConfigs.forEach(id -> doRemovePrefixedShard(id)); + } + + private void resolveAdditions(final Set addedConfigs, + final Map configs) { + LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs); + + addedConfigs.stream().filter(identifier + -> identifier + .getDatastoreType().equals( + ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType()))) + .forEach(id -> doCreatePrefixedShard(configs.get(id))); + } + + private void resolveUpdates(Set maybeUpdatedConfigs) { + LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs); + } + + private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) { + LOG.debug("{} ShardManager : {}, removing prefix shard: {}", + cluster.getCurrentMemberName(), persistenceId, prefix); + final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); + final ShardInformation shard = localShards.remove(shardId.getShardName()); + + configuration.removePrefixShardConfiguration(prefix); + + if (shard == null) { + LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix); + return; + } + + if (shard.getActor() != null) { + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); + shard.getActor().tell(Shutdown.INSTANCE, self()); + } + LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(), + persistenceId(), shardId.getShardName()); + persistShardList(); + } + private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); @@ -412,13 +515,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - byte[] shardManagerSnapshot = null; - if (currentSnapshot != null) { - shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot); - } - ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( - new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), + new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for (ShardInformation shardInfo: localShards.values()) { @@ -476,42 +574,52 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - final PrefixShardConfiguration config = createPrefixedShard.getConfig(); + doCreatePrefixedShard(createPrefixedShard.getConfig()); + // do not replicate on this level + } + + private void doCreatePrefixedShard(final PrefixShardConfiguration config) { + LOG.debug("doCreatePrefixShard : {}", config.getPrefix()); final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - createPrefixedShard.getConfig().getPrefix()); + config.getPrefix()); final String shardName = shardId.getShardName(); - configuration.addPrefixShardConfiguration(config); - - DatastoreContext shardDatastoreContext = createPrefixedShard.getContext(); + if (localShards.containsKey(shardName)) { + LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); + final PrefixShardConfiguration existing = + configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - if (shardDatastoreContext == null) { - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) - .storeRoot(config.getPrefix().getRootIdentifier()); - shardDatastoreContext = builder.build(); - } else { - shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( - peerAddressResolver).build(); + if (existing != null && existing.equals(config)) { + // we don't have to do nothing here + return; + } } - final boolean shardWasInRecoveredSnapshot = currentSnapshot != null - && currentSnapshot.getShardList().contains(shardName); + configuration.addPrefixShardConfiguration(config); + + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) + .storeRoot(config.getPrefix().getRootIdentifier()); + DatastoreContext shardDatastoreContext = builder.build(); final Map peerAddresses = Collections.emptyMap(); final boolean isActiveMember = true; - LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, peerAddresses, isActiveMember); + LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: " + + "{}, peerAddresses: {}, isActiveMember: {}", + shardId, persistenceId(), config.getShardMemberNames(), + peerAddresses, isActiveMember); final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver); + shardDatastoreContext, Shard.builder(), peerAddressResolver); info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); if (schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } + + persistShardList(); } private void doCreateShard(final CreateShard createShard) { @@ -724,16 +832,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (currentSnapshot == null && restoreFromSnapshot != null && restoreFromSnapshot.getShardManagerSnapshot() != null) { - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream( - restoreFromSnapshot.getShardManagerSnapshot()))) { - ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject(); + ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot(); - LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot); + LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot); - applyShardManagerSnapshot(snapshot); - } catch (ClassNotFoundException | IOException e) { - LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e); - } + applyShardManagerSnapshot(snapshot); } createLocalShards(); @@ -1107,9 +1210,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName the shard name */ private Map getPeerAddresses(String shardName) { - Collection members = configuration.getMembersFromShardName(shardName); - Map peerAddresses = new HashMap<>(); + final Collection members = configuration.getMembersFromShardName(shardName); + return getPeerAddresses(shardName, members); + } + private Map getPeerAddresses(final String shardName, final Collection members) { + Map peerAddresses = new HashMap<>(); MemberName currentMemberName = this.cluster.getCurrentMemberName(); for (MemberName memberName : members) { @@ -1384,11 +1490,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList)); + saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); } - private ShardManagerSnapshot updateShardManagerSnapshot(List shardList) { - currentSnapshot = new ShardManagerSnapshot(shardList); + private ShardManagerSnapshot updateShardManagerSnapshot( + final List shardList, + final Map allPrefixShardConfigurations) { + currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); return currentSnapshot; }