From 5370e8be094b802caa732efb4da5a035c53dc9c6 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 31 Aug 2016 17:20:38 +0200 Subject: [PATCH] BUG 2138: Introduce prefix based shards into ShardManager Adds the concept of shards rooted at a DOMDataTreeIdentifier (combination of YangInstanceIdentifier and LogicalDataStore) into the distributed datastore. Change-Id: I43a32556000092c7e7b2ee09b334f82f38ec865b Signed-off-by: Tomas Cere --- .../md-sal/sal-distributed-datastore/pom.xml | 12 ++ .../cluster/datastore/DatastoreContext.java | 16 +- .../datastore/DistributedShardFactory.java | 45 +++++ .../datastore/config/Configuration.java | 16 ++ .../datastore/config/ConfigurationImpl.java | 75 ++++++- .../config/PrefixShardConfiguration.java | 45 +++++ .../messages/AddPrefixShardReplica.java | 36 ++++ .../messages/CreatePrefixedShard.java | 42 ++++ .../datastore/shardmanager/ShardManager.java | 127 +++++++++++- .../shardstrategy/PrefixShardStrategy.java | 34 ++++ .../shardstrategy/ShardStrategyFactory.java | 12 +- .../cluster/datastore/utils/ClusterUtils.java | 42 ++++ .../datastore/AbstractShardManagerTest.java | 95 +++++++++ .../cluster/datastore/AbstractTest.java | 2 + .../cluster/datastore/IntegrationTestKit.java | 17 +- .../datastore/PrefixShardCreationTest.java | 189 ++++++++++++++++++ .../shardmanager/ShardManagerTest.java | 71 +------ .../datastore/utils/MockConfiguration.java | 8 + 18 files changed, 796 insertions(+), 88 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index b4167440d4..b5f1f19f14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -154,6 +154,18 @@ org.opendaylight.mdsal mdsal-eos-dom-api + + org.opendaylight.mdsal + mdsal-dom-spi + + + org.opendaylight.mdsal + mdsal-dom-inmemory-datastore + + + org.opendaylight.mdsal + mdsal-dom-broker + org.opendaylight.controller sal-core-spi diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 0536e4b15a..a81806bc10 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -71,6 +72,7 @@ public class DatastoreContext { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreName = UNKNOWN_DATA_STORE_TYPE; private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL; + private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; @@ -91,7 +93,7 @@ public class DatastoreContext { setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE); } - private DatastoreContext(DatastoreContext other) { + private DatastoreContext(final DatastoreContext other) { this.dataStoreProperties = other.dataStoreProperties; this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout; this.operationTimeoutInMillis = other.operationTimeoutInMillis; @@ -105,6 +107,7 @@ public class DatastoreContext { this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; this.dataStoreName = other.dataStoreName; this.logicalStoreType = other.logicalStoreType; + this.storeRoot = other.storeRoot; this.shardBatchedModificationCount = other.shardBatchedModificationCount; this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; @@ -186,6 +189,10 @@ public class DatastoreContext { return logicalStoreType; } + public YangInstanceIdentifier getStoreRoot() { + return storeRoot; + } + public long getTransactionCreationInitialRateLimit() { return transactionCreationInitialRateLimit; } @@ -407,13 +414,18 @@ public class DatastoreContext { return this; } + public Builder storeRoot(final YangInstanceIdentifier storeRoot) { + datastoreContext.storeRoot = storeRoot; + return this; + } + public Builder dataStoreName(String dataStoreName) { datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName); datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore"; return this; } - public Builder shardBatchedModificationCount(int shardBatchedModificationCount) { + public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) { datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount; return this; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java new file mode 100644 index 0000000000..71005aeac9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import java.util.Collection; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; +import org.opendaylight.yangtools.concepts.Registration; + +/** + * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles + * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend + * shard/replicas that come along with it. + */ +public interface DistributedShardFactory { + + /** + * Register a new shard that is rooted at the desired prefix with replicas on the provided members. + * Note to register a shard without replicas you still need to provide at least one Member for the shard. + * + * @param prefix Shard root + * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard + * should not be replicated. + * @return ShardRegistration that should be closed if the shard should be destroyed + * @throws DOMDataTreeShardingConflictException If the prefix already has a shard registered + * @throws DOMDataTreeProducerException in case there is a problem closing the initial producer that is used to + * register the shard into the ShardingService + */ + DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix, + Collection replicaMembers) + throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException; + + interface DistributedShardRegistration extends Registration { + @Override + void close(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java index 970f9f6ac6..3bf37e0f38 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java @@ -14,6 +14,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public interface Configuration { @@ -32,6 +33,11 @@ public interface Configuration { */ @Nullable String getShardNameForModule(@Nonnull String moduleName); + /** + * Return the shard name corresponding to the prefix, or null if none is configured. + */ + @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix); + /** * Returns the member replicas for the given shard name. */ @@ -52,6 +58,11 @@ public interface Configuration { */ void addModuleShardConfiguration(@Nonnull ModuleShardConfiguration config); + /** + * Adds a new configuration for a shard based on prefix. + */ + void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config); + /** * Returns a unique set of all member names configured for all shards. */ @@ -71,4 +82,9 @@ public interface Configuration { * Removes the given member as a replica for the given shardName. */ void removeMemberReplicaForShard(String shardName, MemberName memberName); + + /** + * Returns the ShardStrategy for the given prefix or null if the prefix is not found. + */ + @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java index 2038495c58..59acdbdb01 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.config; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -18,14 +19,24 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +// TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler public class ConfigurationImpl implements Configuration { private volatile Map moduleConfigMap; + // TODO should this be initialized with something? on restart we should restore the shards from configuration? + private volatile Map prefixConfigMap = Collections.emptyMap(); + // Look up maps to speed things up private volatile Map namespaceToModuleName; @@ -108,6 +119,25 @@ public class ConfigurationImpl implements Configuration { return !shardConfigs.isEmpty() ? shardConfigs.iterator().next().getName() : null; } + @Nullable + @Override + public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + Preconditions.checkNotNull(prefix, "prefix should not be null"); + + Entry bestMatchEntry = + new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null); + + for (Entry entry : prefixConfigMap.entrySet()) { + if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size() + > bestMatchEntry.getKey().getPathArguments().size()) { + bestMatchEntry = entry; + } + } + + //TODO we really should have mapping based on prefix instead of Strings + return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()); + } + @Override public Collection getMembersFromShardName(final String shardName) { Preconditions.checkNotNull(shardName, "shardName should not be null"); @@ -119,6 +149,12 @@ public class ConfigurationImpl implements Configuration { } } + for (final PrefixShardConfiguration prefixConfig : prefixConfigMap.values()) { + if (shardName.equals(ClusterUtils.getCleanShardName(prefixConfig.getPrefix().getRootIdentifier()))) { + return prefixConfig.getShardMemberNames(); + } + } + return Collections.emptyList(); } @@ -153,6 +189,20 @@ public class ConfigurationImpl implements Configuration { allShardNames = ImmutableSet.builder().addAll(allShardNames).add(config.getShardName()).build(); } + @Override + public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) { + Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null"); + updatePrefixConfigMap(config); + allShardNames = ImmutableSet.builder().addAll(allShardNames) + .add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build(); + } + + private void updatePrefixConfigMap(final PrefixShardConfiguration config) { + final Map newPrefixConfigMap = new HashMap<>(prefixConfigMap); + newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config); + prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap); + } + private ShardStrategy createShardStrategy(String moduleName, String shardStrategyName) { return ShardStrategyFactory.newShardStrategyInstance(moduleName, shardStrategyName, this); } @@ -195,8 +245,29 @@ public class ConfigurationImpl implements Configuration { } } - private void updateModuleConfigMap(ModuleConfig moduleConfig) { - Map newModuleConfigMap = new HashMap<>(moduleConfigMap); + @Override + public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + Preconditions.checkNotNull(prefix, "Prefix cannot be null"); + // FIXME using prefix tables like in mdsal will be better + Entry bestMatchEntry = + new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null); + + for (Entry entry : prefixConfigMap.entrySet()) { + if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size() + > bestMatchEntry.getKey().getPathArguments().size()) { + bestMatchEntry = entry; + } + } + + if (bestMatchEntry.getValue() == null) { + return null; + } + return new PrefixShardStrategy( + ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this); + } + + private void updateModuleConfigMap(final ModuleConfig moduleConfig) { + final Map newModuleConfigMap = new HashMap<>(moduleConfigMap); newModuleConfigMap.put(moduleConfig.getName(), moduleConfig); moduleConfigMap = ImmutableMap.copyOf(newModuleConfigMap); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java new file mode 100644 index 0000000000..d18e05b619 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.config; + +import java.io.Serializable; +import java.util.Collection; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; + +/** + * Configuration for prefix based shards. + */ +public class PrefixShardConfiguration implements Serializable { + private static final long serialVersionUID = 1L; + + private final DOMDataTreeIdentifier prefix; + private final String shardStrategyName; + private final Collection shardMemberNames; + + public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix, + final String shardStrategyName, + final Collection shardMemberNames) { + this.prefix = prefix; + this.shardStrategyName = shardStrategyName; + this.shardMemberNames = shardMemberNames; + } + + public DOMDataTreeIdentifier getPrefix() { + return prefix; + } + + public String getShardStrategyName() { + return shardStrategyName; + } + + public Collection getShardMemberNames() { + return shardMemberNames; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java new file mode 100644 index 0000000000..2ac883be8d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * A message sent to the ShardManager to dynamically add a new local shard + * that is a replica for an existing shard that is already available in the + * cluster. + */ + +public class AddPrefixShardReplica { + + private final YangInstanceIdentifier prefix; + + public AddPrefixShardReplica(final YangInstanceIdentifier prefix) { + this.prefix = Preconditions.checkNotNull(prefix); + } + + public YangInstanceIdentifier getPrefix() { + return prefix; + } + + @Override + public String toString() { + return "AddPrefixShardReplica[ShardName=" + prefix + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java new file mode 100644 index 0000000000..9a87a50383 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; + +/** + * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree. + */ +public class CreatePrefixedShard { + + private final PrefixShardConfiguration config; + private final DatastoreContext context; + private final Shard.Builder builder; + + public CreatePrefixedShard(final PrefixShardConfiguration config, + final DatastoreContext context, + final Shard.Builder builder) { + this.config = config; + this.context = context; + this.builder = builder; + } + + public PrefixShardConfiguration getConfig() { + return config; + } + + public DatastoreContext getContext() { + return context; + } + + public Shard.Builder getShardBuilder() { + return builder; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index bbf5cf3100..db998f55ac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -55,18 +55,22 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -80,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -99,6 +104,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -221,14 +227,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if (message instanceof SwitchShardBehavior) { onSwitchShardBehavior((SwitchShardBehavior) message); } else if (message instanceof CreateShard) { - onCreateShard((CreateShard) message); + onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); + } else if (message instanceof CreatePrefixedShard) { + onCreatePrefixedShard((CreatePrefixedShard) message); + } else if (message instanceof AddPrefixShardReplica) { + onAddPrefixShardReplica((AddPrefixShardReplica) message); } else if (message instanceof ForwardedAddServerReply) { - ForwardedAddServerReply msg = (ForwardedAddServerReply) message; - onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure); + ForwardedAddServerReply msg = (ForwardedAddServerReply)message; + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, + msg.removeShardOnFailure); } else if (message instanceof ForwardedAddServerFailure) { - ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message; + ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); @@ -287,7 +298,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure); } else { int nfailed = 0; - for (Boolean result: results) { + for (Boolean result : results) { if (!result) { nfailed++; } @@ -415,6 +426,31 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + @SuppressWarnings("checkstyle:IllegalCatch") + private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { + LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard); + + Object reply; + try { + final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), + createPrefixedShard.getConfig().getPrefix()); + if (localShards.containsKey(shardId.getShardName())) { + LOG.debug("{}: Shard {} already exists", persistenceId(), shardId); + reply = new Status.Success(String.format("Shard with name %s already exists", shardId)); + } else { + doCreatePrefixedShard(createPrefixedShard); + reply = new Status.Success(null); + } + } catch (final Exception e) { + LOG.error("{}: onCreateShard failed", persistenceId(), e); + reply = new Status.Failure(e); + } + + if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + @SuppressWarnings("checkstyle:IllegalCatch") private void onCreateShard(CreateShard createShard) { LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); @@ -439,9 +475,48 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void doCreateShard(CreateShard createShard) { - ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); - String shardName = moduleShardConfig.getShardName(); + private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { + final PrefixShardConfiguration config = createPrefixedShard.getConfig(); + + final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), + createPrefixedShard.getConfig().getPrefix()); + final String shardName = shardId.getShardName(); + + configuration.addPrefixShardConfiguration(config); + + DatastoreContext shardDatastoreContext = createPrefixedShard.getContext(); + + if (shardDatastoreContext == null) { + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name())) + .storeRoot(config.getPrefix().getRootIdentifier()); + shardDatastoreContext = builder.build(); + } else { + shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver( + peerAddressResolver).build(); + } + + final boolean shardWasInRecoveredSnapshot = currentSnapshot != null + && currentSnapshot.getShardList().contains(shardName); + + final Map peerAddresses = Collections.emptyMap(); + final boolean isActiveMember = true; + LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, peerAddresses, isActiveMember); + + final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, + shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver); + info.setActiveMember(isActiveMember); + localShards.put(info.getShardName(), info); + + if (schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + } + + private void doCreateShard(final CreateShard createShard) { + final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); + final String shardName = moduleShardConfig.getShardName(); configuration.addModuleShardConfiguration(moduleShardConfig); @@ -1078,6 +1153,42 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + + // With this message the shard does NOT have to be preconfigured + // do a dynamic lookup if the shard exists somewhere and replicate + private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) { + final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix()); + + LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg); + + if (schemaContext == null) { + final String msg = String.format( + "No SchemaContext is available in order to create a local shard instance for %s", shardName); + LOG.debug("{}: {}", persistenceId(), msg); + getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); + return; + } + + findPrimary(shardName, + new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + addShard(getShardName(), response, getSender()); + } + }, getTargetActor()); + } + + @Override + public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { + sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); + } + } + ); + } + private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java new file mode 100644 index 0000000000..1e08a989d5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.shardstrategy; + +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Shard Strategy that resolves a path to a prefix shard name. + */ +public class PrefixShardStrategy implements ShardStrategy { + + public static final String NAME = "prefix"; + + private final String shardName; + private final Configuration configuration; + + public PrefixShardStrategy(final String shardName, final Configuration configuration) { + this.shardName = shardName; + this.configuration = configuration; + } + + @Override + public String findShard(final YangInstanceIdentifier path) { + final String shardNameForPrefix = configuration.getShardNameForPrefix(path); + return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java index a26dc56a75..a97adc2730 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java @@ -25,10 +25,16 @@ public class ShardStrategyFactory { public ShardStrategy getStrategy(final YangInstanceIdentifier path) { Preconditions.checkNotNull(path, "path should not be null"); - String moduleName = getModuleName(path); - ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName); + // try with the legacy module based shard mapping + final String moduleName = getModuleName(path); + final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName); if (shardStrategy == null) { - return DefaultShardStrategy.getInstance(); + // retry with prefix based sharding + final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path); + if (strategyForPrefix == null) { + return DefaultShardStrategy.getInstance(); + } + return strategyForPrefix; } return shardStrategy; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java new file mode 100644 index 0000000000..dba1761130 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Utils for encoding prefix shard name. + */ +public class ClusterUtils { + + public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) { + return ShardIdentifier + .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name()); + } + + /** + * Returns an encoded shard name based on the provided path that should doesn't contain characters that cannot be + * present in akka actor paths. + * + * @param path Path on which to base the shard name + * @return encoded name that doesn't contain characters that cannot be in actor path. + */ + public static String getCleanShardName(final YangInstanceIdentifier path) { + final StringBuilder builder = new StringBuilder(); + // TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand + path.getPathArguments().forEach(p -> { + builder.append(p.getNodeType().getLocalName()); + builder.append("!"); + }); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java new file mode 100644 index 0000000000..15e6b03645 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import static org.mockito.MockitoAnnotations.initMocks; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mock; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; + +public class AbstractShardManagerTest extends AbstractActorTest { + + protected static final MemberName MEMBER_1 = MemberName.forName("member-1"); + + protected static int ID_COUNTER = 1; + protected static TestActorRef mockShardActor; + protected static ShardIdentifier mockShardName; + + protected final String shardMrgIDSuffix = "config" + ID_COUNTER++; + protected final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + protected final Collection actorSystems = new ArrayList<>(); + protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); + + @Mock + protected static CountDownLatch ready; + + protected TestShardManager.Builder newTestShardMgrBuilder() { + return TestShardManager.builder(datastoreContextBuilder); + } + + protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) { + return TestShardManager.builder(datastoreContextBuilder).configuration(config); + } + + protected Props newShardMgrProps(final Configuration config) { + return newTestShardMgrBuilder(config).waitTillReadyCountDownLatch(ready).props(); + } + + @Before + public void setUp() throws Exception { + initMocks(this); + + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + + if (mockShardActor == null) { + mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config"); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), + mockShardName.toString()); + } + + mockShardActor.underlyingActor().clear(); + } + + @After + public void tearDown() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + + for (final ActorSystem system : actorSystems) { + JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); + } + + mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + mockShardActor = null; + + actorFactory.close(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java index 5dc04f22f0..8256edd272 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java @@ -17,6 +17,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier public abstract class AbstractTest { protected static final MemberName MEMBER_NAME = MemberName.forName("member-1"); + protected static final MemberName MEMBER_2_NAME = MemberName.forName("member-2"); + private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName()); protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 57dedf2f3c..f09441e07e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -68,14 +68,14 @@ public class IntegrationTestKit extends ShardTestKit { SchemaContextHelper.full(), shardNames); } - public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig, - boolean waitUntilLeader, String... shardNames) { + public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, + final boolean waitUntilLeader, final String... shardNames) { return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader, SchemaContextHelper.full(), shardNames); } - public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig, - boolean waitUntilLeader, SchemaContext schemaContext, String... shardNames) { + public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig, + final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) { final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem()); final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf"); @@ -230,9 +230,12 @@ public class IntegrationTestKit extends ShardTestKit { void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain, Class expType) throws Exception { - assertExceptionOnCall(() -> { - txChain.newWriteOnlyTransaction(); - return null; + assertExceptionOnCall(new Callable() { + @Override + public Void call() throws Exception { + txChain.newWriteOnlyTransaction(); + return null; + } }, expType); assertExceptionOnCall(() -> { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java new file mode 100644 index 0000000000..2a3431f67f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; +import akka.actor.Status.Success; +import akka.cluster.Cluster; +import akka.dispatch.Dispatchers; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.Shard.Builder; +import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; +import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; +import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager; +import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests prefix shard creation in ShardManager. + */ +public class PrefixShardCreationTest extends AbstractShardManagerTest { + + private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class); + + private static final DOMDataTreeIdentifier TEST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + + private static final MemberName MEMBER_2 = MemberName.forName("member-2"); + + @Test + public void testPrefixShardCreation() throws Exception { + + LOG.info("testPrefixShardCreation starting"); + new JavaTestKit(getSystem()) { + { + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + final ActorRef shardManager = actorFactory.createActor(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + final SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + final Builder builder = Shard.builder(); + + final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard( + new PrefixShardConfiguration(TEST_ID, + PrefixShardStrategy.NAME, + Collections.singletonList(MEMBER_1)), + datastoreContextBuilder.build(), builder); + + shardManager.tell(createPrefixedShard, getRef()); + expectMsgClass(duration("5 seconds"), Success.class); + + shardManager.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + } + }; + } + + @Test + public void testPrefixShardReplicas() throws Exception { + LOG.info("testPrefixShardReplicas starting"); + final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create ACtorSystem for member-1 + final ActorSystem system1 = newActorSystem("Member1"); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .waitTillReadyCountDownLatch(ready) + .cluster(new ClusterWrapperImpl(system1)) + .props().withDispatcher(Dispatchers.DefaultDispatcherId()), + shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = newActorSystem("Member2"); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder() + .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) + .waitTillReadyCountDownLatch(ready) + .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), + shardManagerID); + + final JavaTestKit kit2 = new JavaTestKit(system2); + + new JavaTestKit(system1) { + { + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + // check shard does not exist + shardManager1.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + shardManager2.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); + kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + // create shard on node1 + final Builder builder = Shard.builder(); + + final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard( + new PrefixShardConfiguration(TEST_ID, + PrefixShardStrategy.NAME, + Lists.newArrayList(MEMBER_1, MEMBER_2)), + datastoreContextBuilder.build(), builder); + + shardManager1.tell(createPrefixedShard, getRef()); + expectMsgClass(duration("5 seconds"), Success.class); + + shardManager1.underlyingActor().waitForMemberUp(); + + LOG.info("changed leader state"); + + // check node2 cannot find it locally + shardManager1.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + shardManager2.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); + kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + // but can remotely + shardManager2.tell(new FindPrimary( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); + kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + + // add replica and verify if succesful + shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef()); + kit2.expectMsgClass(duration("5 seconds"), Success.class); + + // verify we have a replica on manager2 now + shardManager2.tell(new FindLocalShard( + ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef()); + kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class); + } + }; + } + + private ActorSystem newActorSystem(final String config) { + final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); + actorSystems.add(system); + return system; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 705f3c40d8..521a0b86bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -47,7 +47,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.AbstractMap; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -61,21 +60,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.commons.lang3.SerializationUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules; -import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; @@ -111,7 +105,6 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -137,55 +130,13 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -public class ShardManagerTest extends AbstractActorTest { +public class ShardManagerTest extends AbstractShardManagerTest { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); - private static final MemberName MEMBER_1 = MemberName.forName("member-1"); private static final MemberName MEMBER_2 = MemberName.forName("member-2"); private static final MemberName MEMBER_3 = MemberName.forName("member-3"); - private static int ID_COUNTER = 1; - - private final String shardMrgIDSuffix = "config" + ID_COUNTER++; private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - @Mock - private static CountDownLatch ready; - - private static ShardIdentifier mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config"); - - private static TestActorRef mockShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), mockShardName.toString()); - - private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() - .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) - .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); - - private final Collection actorSystems = new ArrayList<>(); - - private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - mockShardActor.underlyingActor().clear(); - } - - @After - public void tearDown() { - InMemoryJournal.clear(); - InMemorySnapshotStore.clear(); - - for (ActorSystem system: actorSystems) { - JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); - } - - actorFactory.close(); - } - private ActorSystem newActorSystem(String config) { ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); actorSystems.add(system); @@ -205,10 +156,6 @@ public class ShardManagerTest extends AbstractActorTest { return newShardMgrProps(new MockConfiguration()); } - private Props newShardMgrProps(Configuration config) { - return newTestShardMgrBuilder(config).props(); - } - private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) { DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); @@ -216,14 +163,6 @@ public class ShardManagerTest extends AbstractActorTest { return mockFactory; } - private TestShardManager.Builder newTestShardMgrBuilder() { - return TestShardManager.builder(datastoreContextBuilder); - } - - private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) { - return TestShardManager.builder(datastoreContextBuilder).configuration(config); - } - private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } @@ -2157,7 +2096,7 @@ public class ShardManagerTest extends AbstractActorTest { }; } - private static class TestShardManager extends ShardManager { + public static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final CountDownLatch snapshotPersist = new CountDownLatch(1); private ShardManagerSnapshot snapshot; @@ -2225,7 +2164,7 @@ public class ShardManagerTest extends AbstractActorTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } - void waitForMemberUp() { + public void waitForMemberUp() { assertEquals("MemberUp received", true, Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); memberUpReceived = new CountDownLatch(1); @@ -2260,7 +2199,7 @@ public class ShardManagerTest extends AbstractActorTest { return new Builder(datastoreContextBuilder); } - private static class Builder extends AbstractGenericCreator { + public static class Builder extends AbstractGenericCreator { private ActorRef shardActor; private final Map shardActors = new HashMap<>(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java index 312d0efec1..3a4c38346f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java @@ -14,9 +14,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.ModuleConfig; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class MockConfiguration extends ConfigurationImpl { public MockConfiguration() { @@ -36,4 +39,9 @@ public class MockConfiguration extends ConfigurationImpl { return retMap; }); } + + @Override + public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) { + return null; + } } -- 2.36.6