BUG 2138: Introduce prefix based shards into ShardManager 05/44705/48
authorTomas Cere <tcere@cisco.com>
Wed, 31 Aug 2016 15:20:38 +0000 (17:20 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 13 Dec 2016 12:49:05 +0000 (12:49 +0000)
Adds the concept of shards rooted at a DOMDataTreeIdentifier
(combination of YangInstanceIdentifier and LogicalDataStore)
into the distributed datastore.

Change-Id: I43a32556000092c7e7b2ee09b334f82f38ec865b
Signed-off-by: Tomas Cere <tcere@cisco.com>
18 files changed:
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java

index b416744..b5f1f19 100644 (file)
       <groupId>org.opendaylight.mdsal</groupId>
       <artifactId>mdsal-eos-dom-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-dom-spi</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-dom-inmemory-datastore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.mdsal</groupId>
+      <artifactId>mdsal-dom-broker</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-core-spi</artifactId>
index 0536e4b..a81806b 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -71,6 +72,7 @@ public class DatastoreContext {
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
     private String dataStoreName = UNKNOWN_DATA_STORE_TYPE;
     private LogicalDatastoreType logicalStoreType = LogicalDatastoreType.OPERATIONAL;
+    private YangInstanceIdentifier storeRoot = YangInstanceIdentifier.EMPTY;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
@@ -91,7 +93,7 @@ public class DatastoreContext {
         setShardSnapshotChunkSize(DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE);
     }
 
-    private DatastoreContext(DatastoreContext other) {
+    private DatastoreContext(final DatastoreContext other) {
         this.dataStoreProperties = other.dataStoreProperties;
         this.shardTransactionIdleTimeout = other.shardTransactionIdleTimeout;
         this.operationTimeoutInMillis = other.operationTimeoutInMillis;
@@ -105,6 +107,7 @@ public class DatastoreContext {
         this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
         this.dataStoreName = other.dataStoreName;
         this.logicalStoreType = other.logicalStoreType;
+        this.storeRoot = other.storeRoot;
         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
@@ -186,6 +189,10 @@ public class DatastoreContext {
         return logicalStoreType;
     }
 
+    public YangInstanceIdentifier getStoreRoot() {
+        return storeRoot;
+    }
+
     public long getTransactionCreationInitialRateLimit() {
         return transactionCreationInitialRateLimit;
     }
@@ -407,13 +414,18 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder storeRoot(final YangInstanceIdentifier storeRoot) {
+            datastoreContext.storeRoot = storeRoot;
+            return this;
+        }
+
         public Builder dataStoreName(String dataStoreName) {
             datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
             datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
             return this;
         }
 
-        public Builder shardBatchedModificationCount(int shardBatchedModificationCount) {
+        public Builder shardBatchedModificationCount(final int shardBatchedModificationCount) {
             datastoreContext.shardBatchedModificationCount = shardBatchedModificationCount;
             return this;
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedShardFactory.java
new file mode 100644 (file)
index 0000000..71005ae
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.yangtools.concepts.Registration;
+
+/**
+ * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles
+ * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend
+ * shard/replicas that come along with it.
+ */
+public interface DistributedShardFactory {
+
+    /**
+     * Register a new shard that is rooted at the desired prefix with replicas on the provided members.
+     * Note to register a shard without replicas you still need to provide at least one Member for the shard.
+     *
+     * @param prefix         Shard root
+     * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard
+     *                       should not be replicated.
+     * @return ShardRegistration that should be closed if the shard should be destroyed
+     * @throws DOMDataTreeShardingConflictException If the prefix already has a shard registered
+     * @throws DOMDataTreeProducerException in case there is a problem closing the initial producer that is used to
+     *                                      register the shard into the ShardingService
+     */
+    DistributedShardRegistration createDistributedShard(DOMDataTreeIdentifier prefix,
+                                                        Collection<MemberName> replicaMembers)
+            throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException;
+
+    interface DistributedShardRegistration extends Registration {
+        @Override
+        void close();
+    }
+}
\ No newline at end of file
index 970f9f6..3bf37e0 100644 (file)
@@ -14,6 +14,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public interface Configuration {
 
@@ -32,6 +33,11 @@ public interface Configuration {
      */
     @Nullable String getShardNameForModule(@Nonnull String moduleName);
 
+    /**
+     * Return the shard name corresponding to the prefix, or null if none is configured.
+     */
+    @Nullable String getShardNameForPrefix(@Nonnull YangInstanceIdentifier prefix);
+
     /**
      * Returns the member replicas for the given shard name.
      */
@@ -52,6 +58,11 @@ public interface Configuration {
      */
     void addModuleShardConfiguration(@Nonnull ModuleShardConfiguration config);
 
+    /**
+     * Adds a new configuration for a shard based on prefix.
+     */
+    void addPrefixShardConfiguration(@Nonnull PrefixShardConfiguration config);
+
     /**
      * Returns a unique set of all member names configured for all shards.
      */
@@ -71,4 +82,9 @@ public interface Configuration {
      * Removes the given member as a replica for the given shardName.
      */
     void removeMemberReplicaForShard(String shardName, MemberName memberName);
+
+    /**
+     * Returns the ShardStrategy for the given prefix or null if the prefix is not found.
+     */
+    @Nullable ShardStrategy getStrategyForPrefix(@Nonnull YangInstanceIdentifier prefix);
 }
index 2038495..59acdbd 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.config;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -18,14 +19,24 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
+// TODO clean this up once we get rid of module based configuration, prefix one should be alot simpler
 public class ConfigurationImpl implements Configuration {
     private volatile Map<String, ModuleConfig> moduleConfigMap;
 
+    // TODO should this be initialized with something? on restart we should restore the shards from configuration?
+    private volatile Map<YangInstanceIdentifier, PrefixShardConfiguration> prefixConfigMap = Collections.emptyMap();
+
     // Look up maps to speed things up
 
     private volatile Map<String, String> namespaceToModuleName;
@@ -108,6 +119,25 @@ public class ConfigurationImpl implements Configuration {
         return !shardConfigs.isEmpty() ? shardConfigs.iterator().next().getName() : null;
     }
 
+    @Nullable
+    @Override
+    public String getShardNameForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+        Preconditions.checkNotNull(prefix, "prefix should not be null");
+
+        Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
+                new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+
+        for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+            if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
+                    > bestMatchEntry.getKey().getPathArguments().size()) {
+                bestMatchEntry = entry;
+            }
+        }
+
+        //TODO we really should have mapping based on prefix instead of Strings
+        return ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier());
+    }
+
     @Override
     public Collection<MemberName> getMembersFromShardName(final String shardName) {
         Preconditions.checkNotNull(shardName, "shardName should not be null");
@@ -119,6 +149,12 @@ public class ConfigurationImpl implements Configuration {
             }
         }
 
+        for (final PrefixShardConfiguration prefixConfig : prefixConfigMap.values()) {
+            if (shardName.equals(ClusterUtils.getCleanShardName(prefixConfig.getPrefix().getRootIdentifier()))) {
+                return prefixConfig.getShardMemberNames();
+            }
+        }
+
         return Collections.emptyList();
     }
 
@@ -153,6 +189,20 @@ public class ConfigurationImpl implements Configuration {
         allShardNames = ImmutableSet.<String>builder().addAll(allShardNames).add(config.getShardName()).build();
     }
 
+    @Override
+    public void addPrefixShardConfiguration(@Nonnull final PrefixShardConfiguration config) {
+        Preconditions.checkNotNull(config, "PrefixShardConfiguration cannot be null");
+        updatePrefixConfigMap(config);
+        allShardNames = ImmutableSet.<String>builder().addAll(allShardNames)
+                .add(ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())).build();
+    }
+
+    private void updatePrefixConfigMap(final PrefixShardConfiguration config) {
+        final Map<YangInstanceIdentifier, PrefixShardConfiguration> newPrefixConfigMap = new HashMap<>(prefixConfigMap);
+        newPrefixConfigMap.put(config.getPrefix().getRootIdentifier(), config);
+        prefixConfigMap = ImmutableMap.copyOf(newPrefixConfigMap);
+    }
+
     private ShardStrategy createShardStrategy(String moduleName, String shardStrategyName) {
         return ShardStrategyFactory.newShardStrategyInstance(moduleName, shardStrategyName, this);
     }
@@ -195,8 +245,29 @@ public class ConfigurationImpl implements Configuration {
         }
     }
 
-    private void updateModuleConfigMap(ModuleConfig moduleConfig) {
-        Map<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
+    @Override
+    public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+        Preconditions.checkNotNull(prefix, "Prefix cannot be null");
+        // FIXME using prefix tables like in mdsal will be better
+        Entry<YangInstanceIdentifier, PrefixShardConfiguration> bestMatchEntry =
+                new SimpleEntry<>(YangInstanceIdentifier.EMPTY, null);
+
+        for (Entry<YangInstanceIdentifier, PrefixShardConfiguration> entry : prefixConfigMap.entrySet()) {
+            if (entry.getKey().contains(prefix) && entry.getKey().getPathArguments().size()
+                    > bestMatchEntry.getKey().getPathArguments().size()) {
+                bestMatchEntry = entry;
+            }
+        }
+
+        if (bestMatchEntry.getValue() == null) {
+            return null;
+        }
+        return new PrefixShardStrategy(
+                ClusterUtils.getCleanShardName(bestMatchEntry.getValue().getPrefix().getRootIdentifier()), this);
+    }
+
+    private void updateModuleConfigMap(final ModuleConfig moduleConfig) {
+        final Map<String, ModuleConfig> newModuleConfigMap = new HashMap<>(moduleConfigMap);
         newModuleConfigMap.put(moduleConfig.getName(), moduleConfig);
         moduleConfigMap = ImmutableMap.copyOf(newModuleConfigMap);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/PrefixShardConfiguration.java
new file mode 100644 (file)
index 0000000..d18e05b
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.config;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+
+/**
+ * Configuration for prefix based shards.
+ */
+public class PrefixShardConfiguration implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DOMDataTreeIdentifier prefix;
+    private final String shardStrategyName;
+    private final Collection<MemberName> shardMemberNames;
+
+    public PrefixShardConfiguration(final DOMDataTreeIdentifier prefix,
+                                    final String shardStrategyName,
+                                    final Collection<MemberName> shardMemberNames) {
+        this.prefix = prefix;
+        this.shardStrategyName = shardStrategyName;
+        this.shardMemberNames = shardMemberNames;
+    }
+
+    public DOMDataTreeIdentifier getPrefix() {
+        return prefix;
+    }
+
+    public String getShardStrategyName() {
+        return shardStrategyName;
+    }
+
+    public Collection<MemberName> getShardMemberNames() {
+        return shardMemberNames;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AddPrefixShardReplica.java
new file mode 100644 (file)
index 0000000..2ac883b
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * A message sent to the ShardManager to dynamically add a new local shard
+ * that is a replica for an existing shard that is already available in the
+ * cluster.
+ */
+
+public class AddPrefixShardReplica {
+
+    private final YangInstanceIdentifier prefix;
+
+    public AddPrefixShardReplica(final YangInstanceIdentifier prefix) {
+        this.prefix = Preconditions.checkNotNull(prefix);
+    }
+
+    public YangInstanceIdentifier getPrefix() {
+        return prefix;
+    }
+
+    @Override
+    public String toString() {
+        return "AddPrefixShardReplica[ShardName=" + prefix + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreatePrefixedShard.java
new file mode 100644 (file)
index 0000000..9a87a50
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+
+/**
+ * Message sent to the ShardManager to create a shard located at a certain logical position in the dataTree.
+ */
+public class CreatePrefixedShard {
+
+    private final PrefixShardConfiguration config;
+    private final DatastoreContext context;
+    private final Shard.Builder builder;
+
+    public CreatePrefixedShard(final PrefixShardConfiguration config,
+                               final DatastoreContext context,
+                               final Shard.Builder builder) {
+        this.config = config;
+        this.context = context;
+        this.builder = builder;
+    }
+
+    public PrefixShardConfiguration getConfig() {
+        return config;
+    }
+
+    public DatastoreContext getContext() {
+        return context;
+    }
+
+    public Shard.Builder getShardBuilder() {
+        return builder;
+    }
+}
index bbf5cf3..db998f5 100644 (file)
@@ -55,18 +55,22 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -80,6 +84,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShard
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -99,6 +104,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -221,14 +227,19 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if (message instanceof SwitchShardBehavior) {
             onSwitchShardBehavior((SwitchShardBehavior) message);
         } else if (message instanceof CreateShard) {
-            onCreateShard((CreateShard) message);
+            onCreateShard((CreateShard)message);
         } else if (message instanceof AddShardReplica) {
             onAddShardReplica((AddShardReplica) message);
+        } else if (message instanceof CreatePrefixedShard) {
+            onCreatePrefixedShard((CreatePrefixedShard) message);
+        } else if (message instanceof AddPrefixShardReplica) {
+            onAddPrefixShardReplica((AddPrefixShardReplica) message);
         } else if (message instanceof ForwardedAddServerReply) {
-            ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
-            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+            ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+            onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+                    msg.removeShardOnFailure);
         } else if (message instanceof ForwardedAddServerFailure) {
-            ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
+            ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
         } else if (message instanceof RemoveShardReplica) {
             onRemoveShardReplica((RemoveShardReplica) message);
@@ -287,7 +298,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
                 } else {
                     int nfailed = 0;
-                    for (Boolean result: results) {
+                    for (Boolean result : results) {
                         if (!result) {
                             nfailed++;
                         }
@@ -415,6 +426,31 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+        LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+        Object reply;
+        try {
+            final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+                    createPrefixedShard.getConfig().getPrefix());
+            if (localShards.containsKey(shardId.getShardName())) {
+                LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+                reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+            } else {
+                doCreatePrefixedShard(createPrefixedShard);
+                reply = new Status.Success(null);
+            }
+        } catch (final Exception e) {
+            LOG.error("{}: onCreateShard failed", persistenceId(), e);
+            reply = new Status.Failure(e);
+        }
+
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void onCreateShard(CreateShard createShard) {
         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
@@ -439,9 +475,48 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private void doCreateShard(CreateShard createShard) {
-        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-        String shardName = moduleShardConfig.getShardName();
+    private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+        final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+
+        final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+                createPrefixedShard.getConfig().getPrefix());
+        final String shardName = shardId.getShardName();
+
+        configuration.addPrefixShardConfiguration(config);
+
+        DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+
+        if (shardDatastoreContext == null) {
+            final Builder builder = newShardDatastoreContextBuilder(shardName);
+            builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+                    .storeRoot(config.getPrefix().getRootIdentifier());
+            shardDatastoreContext = builder.build();
+        } else {
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                    peerAddressResolver).build();
+        }
+
+        final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+                && currentSnapshot.getShardList().contains(shardName);
+
+        final Map<String, String> peerAddresses = Collections.emptyMap();
+        final boolean isActiveMember = true;
+        LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, peerAddresses, isActiveMember);
+
+        final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
+
+        if (schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
+        }
+    }
+
+    private void doCreateShard(final CreateShard createShard) {
+        final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        final String shardName = moduleShardConfig.getShardName();
 
         configuration.addModuleShardConfiguration(moduleShardConfig);
 
@@ -1078,6 +1153,42 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return false;
     }
 
+
+    // With this message the shard does NOT have to be preconfigured
+    // do a dynamic lookup if the shard exists somewhere and replicate
+    private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+        final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+        LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+        if (schemaContext == null) {
+            final String msg = String.format(
+                    "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug("{}: {}", persistenceId(), msg);
+            getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        findPrimary(shardName,
+                new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+                    @Override
+                    public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+                        getSelf().tell(new RunnableMessage() {
+                            @Override
+                            public void run() {
+                                addShard(getShardName(), response, getSender());
+                            }
+                        }, getTargetActor());
+                    }
+
+                    @Override
+                    public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+                        sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+                    }
+                }
+        );
+    }
+
     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/PrefixShardStrategy.java
new file mode 100644 (file)
index 0000000..1e08a98
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.shardstrategy;
+
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Shard Strategy that resolves a path to a prefix shard name.
+ */
+public class PrefixShardStrategy implements ShardStrategy {
+
+    public static final String NAME = "prefix";
+
+    private final String shardName;
+    private final Configuration configuration;
+
+    public PrefixShardStrategy(final String shardName, final Configuration configuration) {
+        this.shardName = shardName;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public String findShard(final YangInstanceIdentifier path) {
+        final String shardNameForPrefix = configuration.getShardNameForPrefix(path);
+        return shardNameForPrefix != null ? shardName : DefaultShardStrategy.DEFAULT_SHARD;
+    }
+}
index a26dc56..a97adc2 100644 (file)
@@ -25,10 +25,16 @@ public class ShardStrategyFactory {
     public ShardStrategy getStrategy(final YangInstanceIdentifier path) {
         Preconditions.checkNotNull(path, "path should not be null");
 
-        String moduleName = getModuleName(path);
-        ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
+        // try with the legacy module based shard mapping
+        final String moduleName = getModuleName(path);
+        final ShardStrategy shardStrategy = configuration.getStrategyForModule(moduleName);
         if (shardStrategy == null) {
-            return DefaultShardStrategy.getInstance();
+            // retry with prefix based sharding
+            final ShardStrategy strategyForPrefix = configuration.getStrategyForPrefix(path);
+            if (strategyForPrefix == null) {
+                return DefaultShardStrategy.getInstance();
+            }
+            return strategyForPrefix;
         }
 
         return shardStrategy;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ClusterUtils.java
new file mode 100644 (file)
index 0000000..dba1761
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Utils for encoding prefix shard name.
+ */
+public class ClusterUtils {
+
+    public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
+        return ShardIdentifier
+                .create(getCleanShardName(prefix.getRootIdentifier()), memberName, prefix.getDatastoreType().name());
+    }
+
+    /**
+     * Returns an encoded shard name based on the provided path that should doesn't contain characters that cannot be
+     * present in akka actor paths.
+     *
+     * @param path Path on which to base the shard name
+     * @return encoded name that doesn't contain characters that cannot be in actor path.
+     */
+    public static String getCleanShardName(final YangInstanceIdentifier path) {
+        final StringBuilder builder = new StringBuilder();
+        // TODO need a better mapping that includes namespace, but we'll need to cleanup the string beforehand
+        path.getPathArguments().forEach(p -> {
+            builder.append(p.getNodeType().getLocalName());
+            builder.append("!");
+        });
+        return builder.toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
new file mode 100644 (file)
index 0000000..15e6b03
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+public class AbstractShardManagerTest extends AbstractActorTest {
+
+    protected static final MemberName MEMBER_1 = MemberName.forName("member-1");
+
+    protected static int ID_COUNTER = 1;
+    protected static TestActorRef<MessageCollectorActor> mockShardActor;
+    protected static ShardIdentifier mockShardName;
+
+    protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
+    protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+    protected final Collection<ActorSystem> actorSystems = new ArrayList<>();
+    protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+            .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+            .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
+
+    @Mock
+    protected static CountDownLatch ready;
+
+    protected TestShardManager.Builder newTestShardMgrBuilder() {
+        return TestShardManager.builder(datastoreContextBuilder);
+    }
+
+    protected TestShardManager.Builder newTestShardMgrBuilder(final Configuration config) {
+        return TestShardManager.builder(datastoreContextBuilder).configuration(config);
+    }
+
+    protected Props newShardMgrProps(final Configuration config) {
+        return newTestShardMgrBuilder(config).waitTillReadyCountDownLatch(ready).props();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        initMocks(this);
+
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
+        if (mockShardActor == null) {
+            mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
+            mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
+                    mockShardName.toString());
+        }
+
+        mockShardActor.underlyingActor().clear();
+    }
+
+    @After
+    public void tearDown() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
+        for (final ActorSystem system : actorSystems) {
+            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+        }
+
+        mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        mockShardActor = null;
+
+        actorFactory.close();
+    }
+}
index 5dc04f2..8256edd 100644 (file)
@@ -17,6 +17,8 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
 
 public abstract class AbstractTest {
     protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
+    protected static final MemberName MEMBER_2_NAME = MemberName.forName("member-2");
+
     private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName());
 
     protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
index 57dedf2..f09441e 100644 (file)
@@ -68,14 +68,14 @@ public class IntegrationTestKit extends ShardTestKit {
                 SchemaContextHelper.full(), shardNames);
     }
 
-    public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
-            boolean waitUntilLeader, String... shardNames) {
+    public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+            final boolean waitUntilLeader, final String... shardNames) {
         return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
                 SchemaContextHelper.full(), shardNames);
     }
 
-    public DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig,
-            boolean waitUntilLeader, SchemaContext schemaContext, String... shardNames) {
+    public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+            final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
         final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
 
@@ -230,9 +230,12 @@ public class IntegrationTestKit extends ShardTestKit {
 
     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
             Class<? extends Exception> expType) throws Exception {
-        assertExceptionOnCall(() -> {
-            txChain.newWriteOnlyTransaction();
-            return null;
+        assertExceptionOnCall(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                txChain.newWriteOnlyTransaction();
+                return null;
+            }
         }, expType);
 
         assertExceptionOnCall(() -> {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/PrefixShardCreationTest.java
new file mode 100644 (file)
index 0000000..2a3431f
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
+import akka.actor.Status.Success;
+import akka.cluster.Cluster;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.Shard.Builder;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerTest.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests prefix shard creation in ShardManager.
+ */
+public class PrefixShardCreationTest extends AbstractShardManagerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefixShardCreationTest.class);
+
+    private static final DOMDataTreeIdentifier TEST_ID =
+            new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+    private static final MemberName MEMBER_2 = MemberName.forName("member-2");
+
+    @Test
+    public void testPrefixShardCreation() throws Exception {
+
+        LOG.info("testPrefixShardCreation starting");
+        new JavaTestKit(getSystem()) {
+            {
+                datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+                final ActorRef shardManager = actorFactory.createActor(newShardMgrProps(
+                        new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+                final SchemaContext schemaContext = TestModel.createTestContext();
+                shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+                shardManager.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                final Builder builder = Shard.builder();
+
+                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+                        new PrefixShardConfiguration(TEST_ID,
+                                PrefixShardStrategy.NAME,
+                                Collections.singletonList(MEMBER_1)),
+                        datastoreContextBuilder.build(), builder);
+
+                shardManager.tell(createPrefixedShard, getRef());
+                expectMsgClass(duration("5 seconds"), Success.class);
+
+                shardManager.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+            }
+        };
+    }
+
+    @Test
+    public void testPrefixShardReplicas() throws Exception {
+        LOG.info("testPrefixShardReplicas starting");
+        final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create ACtorSystem for member-1
+        final ActorSystem system1 = newActorSystem("Member1");
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
+                newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+                        .waitTillReadyCountDownLatch(ready)
+                        .cluster(new ClusterWrapperImpl(system1))
+                        .props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = newActorSystem("Member2");
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
+                newTestShardMgrBuilder()
+                        .configuration(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
+                        .waitTillReadyCountDownLatch(ready)
+                        .cluster(new ClusterWrapperImpl(system2)).props().withDispatcher(
+                        Dispatchers.DefaultDispatcherId()),
+                shardManagerID);
+
+        final JavaTestKit kit2 = new JavaTestKit(system2);
+
+        new JavaTestKit(system1) {
+            {
+                shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+                shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+                // check shard does not exist
+                shardManager1.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                // create shard on node1
+                final Builder builder = Shard.builder();
+
+                final CreatePrefixedShard createPrefixedShard = new CreatePrefixedShard(
+                        new PrefixShardConfiguration(TEST_ID,
+                                PrefixShardStrategy.NAME,
+                                Lists.newArrayList(MEMBER_1, MEMBER_2)),
+                        datastoreContextBuilder.build(), builder);
+
+                shardManager1.tell(createPrefixedShard, getRef());
+                expectMsgClass(duration("5 seconds"), Success.class);
+
+                shardManager1.underlyingActor().waitForMemberUp();
+
+                LOG.info("changed leader state");
+
+                // check node2 cannot find it locally
+                shardManager1.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+                // but can remotely
+                shardManager2.tell(new FindPrimary(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
+
+                // add replica and verify if succesful
+                shardManager2.tell(new AddPrefixShardReplica(TEST_ID.getRootIdentifier()), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), Success.class);
+
+                // verify we have a replica on manager2 now
+                shardManager2.tell(new FindLocalShard(
+                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), kit2.getRef());
+                kit2.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+            }
+        };
+    }
+
+    private ActorSystem newActorSystem(final String config) {
+        final ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
+        actorSystems.add(system);
+        return system;
+    }
+}
\ No newline at end of file
index 705f3c4..521a0b8 100644 (file)
@@ -47,7 +47,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.AbstractMap;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -61,21 +60,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.SerializationUtils;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.AbstractShardManagerTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.ShardManager.SchemaContextModules;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -111,7 +105,6 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
@@ -137,55 +130,13 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class ShardManagerTest extends AbstractActorTest {
+public class ShardManagerTest extends AbstractShardManagerTest {
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class);
-    private static final MemberName MEMBER_1 = MemberName.forName("member-1");
     private static final MemberName MEMBER_2 = MemberName.forName("member-2");
     private static final MemberName MEMBER_3 = MemberName.forName("member-3");
 
-    private static int ID_COUNTER = 1;
-
-    private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
-    @Mock
-    private static CountDownLatch ready;
-
-    private static ShardIdentifier mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
-
-    private static TestActorRef<MessageCollectorActor> mockShardActor = TestActorRef.create(getSystem(),
-            Props.create(MessageCollectorActor.class), mockShardName.toString());
-
-    private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
-            .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
-            .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
-
-    private final Collection<ActorSystem> actorSystems = new ArrayList<>();
-
-    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
-        InMemoryJournal.clear();
-        InMemorySnapshotStore.clear();
-
-        mockShardActor.underlyingActor().clear();
-    }
-
-    @After
-    public void tearDown() {
-        InMemoryJournal.clear();
-        InMemorySnapshotStore.clear();
-
-        for (ActorSystem system: actorSystems) {
-            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
-        }
-
-        actorFactory.close();
-    }
-
     private ActorSystem newActorSystem(String config) {
         ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
         actorSystems.add(system);
@@ -205,10 +156,6 @@ public class ShardManagerTest extends AbstractActorTest {
         return newShardMgrProps(new MockConfiguration());
     }
 
-    private Props newShardMgrProps(Configuration config) {
-        return newTestShardMgrBuilder(config).props();
-    }
-
     private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) {
         DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext();
@@ -216,14 +163,6 @@ public class ShardManagerTest extends AbstractActorTest {
         return mockFactory;
     }
 
-    private TestShardManager.Builder newTestShardMgrBuilder() {
-        return TestShardManager.builder(datastoreContextBuilder);
-    }
-
-    private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) {
-        return TestShardManager.builder(datastoreContextBuilder).configuration(config);
-    }
-
     private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() {
         return newTestShardMgrBuilderWithMockShardActor(mockShardActor);
     }
@@ -2157,7 +2096,7 @@ public class ShardManagerTest extends AbstractActorTest {
         };
     }
 
-    private static class TestShardManager extends ShardManager {
+    public static class TestShardManager extends ShardManager {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
         private ShardManagerSnapshot snapshot;
@@ -2225,7 +2164,7 @@ public class ShardManagerTest extends AbstractActorTest {
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
         }
 
-        void waitForMemberUp() {
+        public void waitForMemberUp() {
             assertEquals("MemberUp received", true,
                     Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
             memberUpReceived = new CountDownLatch(1);
@@ -2260,7 +2199,7 @@ public class ShardManagerTest extends AbstractActorTest {
             return new Builder(datastoreContextBuilder);
         }
 
-        private static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
+        public static class Builder extends AbstractGenericCreator<Builder, TestShardManager> {
             private ActorRef shardActor;
             private final Map<String, ActorRef> shardActors = new HashMap<>();
 
index 312d0ef..3a4c383 100644 (file)
@@ -14,9 +14,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class MockConfiguration extends ConfigurationImpl {
     public MockConfiguration() {
@@ -36,4 +39,9 @@ public class MockConfiguration extends ConfigurationImpl {
             return retMap;
         });
     }
+
+    @Override
+    public ShardStrategy getStrategyForPrefix(@Nonnull final YangInstanceIdentifier prefix) {
+        return null;
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.